You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/16 17:26:46 UTC

[1/5] git commit: Revert "Make sure the mesos-fetcher exits if the slave terminates."

Repository: mesos
Updated Branches:
  refs/heads/master 0a2957ed0 -> 8cbb85c8a


Revert "Make sure the mesos-fetcher exits if the slave terminates."

This reverts commit f66fa52e7efd9c10f9256805e45095591d4833a7.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d502757
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d502757
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d502757

Branch: refs/heads/master
Commit: 6d5027573cc4e88ebb92b3ec7c6299516178f0aa
Parents: 0a2957e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 19:53:21 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 15 19:53:21 2014 -0700

----------------------------------------------------------------------
 src/launcher/fetcher.cpp                        | 46 --------------------
 src/slave/containerizer/mesos/containerizer.cpp |  2 -
 2 files changed, 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6d502757/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 1e3d516..50e9918 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -20,8 +20,6 @@
 
 #include <mesos/mesos.hpp>
 
-#include <process/io.hpp>
-
 #include <stout/net.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -193,54 +191,10 @@ Try<string> fetch(
 }
 
 
-// A helper function for abnormally cancelling the fetching because
-// our parent has died (e.g., the slave).
-process::Future<Nothing> cancel()
-{
-  // We don't easily have a handle on any of the children we've
-  // potentially started since they're hidden behind os::system,
-  // net::download, HDFS, etc, so we just do a killtree on all of our
-  // children.
-  //
-  // TODO(benh): This still isn't sufficient because we might be in
-  // the middle of forking a process. What we really need to do is run
-  // os::kiltree "outside" of this process so that we can pause this
-  // process too!
-  Try<os::ProcessTree> pstree = os::pstree(0);
-
-  if (pstree.isSome() && !pstree.get().children.empty()) {
-    foreach (const os::ProcessTree& child, pstree.get().children) {
-      // NOTE: We don't follow groups or sessions because it's
-      // possible we'll end up killing ourselves, or worse, the slave!
-      os::killtree(child.process.pid, 9);
-    }
-  }
-
-  EXIT(1) << "Cancelled fetching because stdin was closed "
-          << "(e.g., because the parent has exited)";
-
-  return Nothing();
-}
-
-
 int main(int argc, char* argv[])
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-  // The current semantics of the mesos-fetcher is that it should
-  // terminate if/when its parent terminates. To support this, we read
-  // from stdin and if/when we get back an EOF then we "cancel" any
-  // fetching and exit so we don't become an orphan (which would be
-  // especially bad in the event calling something like HDFS ends up
-  // hung indefinitely).
-  //
-  // TODO(benh): Introduce a timeout for fetching each URI that can be
-  // set via flags on the slave.
-  //
-  // TODO(benh): Introduce a flag here for changing these semantics.
-  process::io::read(STDIN_FILENO)
-    .then(lambda::bind(&cancel));
-
   CommandInfo commandInfo;
   // Construct URIs from the encoded environment string.
   const std::string& uris = os::getenv("MESOS_EXECUTOR_URIS");

http://git-wip-us.apache.org/repos/asf/mesos/blob/6d502757/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index cdf440d..d0676c5 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -502,8 +502,6 @@ Future<Nothing> MesosContainerizerProcess::fetch(
   LOG(INFO) << "Fetching URIs for container '" << containerId
             << "' using command '" << command << "'";
 
-  // NOTE: It's important that we create a pipe for the mesos-fetcher
-  // stdin so that when the slave exits it will terminate itself.
   Try<Subprocess> fetcher = subprocess(
       command,
       Subprocess::PIPE(),


[2/5] git commit: Set ownership of stdout/stderr and container directory properly.

Posted by be...@apache.org.
Set ownership of stdout/stderr and container directory properly.

Review: https://reviews.apache.org/r/24766


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/337e9558
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/337e9558
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/337e9558

Branch: refs/heads/master
Commit: 337e9558307c6799d46fee2d9ff738126d36163f
Parents: fd55381
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 21:35:33 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Aug 16 08:25:39 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 48 ++++++++++++++++++++++++++++++---
 1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/337e9558/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index ced0f92..5fa0275 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -390,8 +390,6 @@ Future<Nothing> DockerContainerizerProcess::fetch(
   VLOG(1) << "Starting to fetch URIs for container: " << containerId
           << ", directory: " << directory;
 
-  // NOTE: It's important that we create a pipe for the mesos-fetcher
-  // stdin so that when the slave exits it will terminate itself.
   Try<Subprocess> fetcher = subprocess(
       realpath.get(),
       Subprocess::PIPE(),
@@ -777,13 +775,35 @@ Future<bool> DockerContainerizerProcess::launch(
     return false;
   }
 
-  ContainerInfo containerInfo = executorInfo.container();
+  ContainerInfo containerInfo = taskInfo.container();
 
   if (containerInfo.type() != ContainerInfo::DOCKER) {
     LOG(INFO) << "Skipping non-docker container";
     return false;
   }
 
+  // Before we do anything else we first make sure the stdout/stderr
+  // files exist and have the right file ownership.
+  Try<Nothing> touch = os::touch(path::join(directory, "stdout"));
+
+  if (touch.isError()) {
+    return Failure("Failed to touch 'stdout': " + touch.error());
+  }
+
+  touch = os::touch(path::join(directory, "stderr"));
+
+  if (touch.isError()) {
+    return Failure("Failed to touch 'stderr': " + touch.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(user.get(), directory, true);
+
+    if (chown.isError()) {
+      return Failure("Failed to chown: " + chown.error());
+    }
+  }
+
   LOG(INFO) << "Starting container '" << containerId
             << "' for task '" << taskInfo.task_id()
             << "' (and executor '" << executorInfo.executor_id()
@@ -1003,6 +1023,28 @@ Future<bool> DockerContainerizerProcess::launch(
     return false;
   }
 
+  // Before we do anything else we first make sure the stdout/stderr
+  // files exist and have the right file ownership.
+  Try<Nothing> touch = os::touch(path::join(directory, "stdout"));
+
+  if (touch.isError()) {
+    return Failure("Failed to touch 'stdout': " + touch.error());
+  }
+
+  touch = os::touch(path::join(directory, "stderr"));
+
+  if (touch.isError()) {
+    return Failure("Failed to touch 'stderr': " + touch.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(user.get(), directory, true);
+
+    if (chown.isError()) {
+      return Failure("Failed to chown: " + chown.error());
+    }
+  }
+
   LOG(INFO) << "Starting container '" << containerId
             << "' for executor '" << executorInfo.executor_id()
             << "' and framework '" << executorInfo.framework_id() << "'";


[5/5] git commit: Save Docker container pid for subsequent containerizer updates.

Posted by be...@apache.org.
Save Docker container pid for subsequent containerizer updates.

Review: https://reviews.apache.org/r/24768


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbb85c8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbb85c8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbb85c8

Branch: refs/heads/master
Commit: 8cbb85c8af6eae9453a868f67f2fb8dd387e18ba
Parents: a7b706a
Author: Timothy Chen <tn...@apache.org>
Authored: Sat Aug 16 07:53:55 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Aug 16 08:25:40 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp       | 55 ++++++++++++++++++++++-----
 src/tests/docker_containerizer_tests.cpp | 20 ++++++++++
 2 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbb85c8/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 5fa0275..215c2b4 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -227,6 +227,11 @@ private:
       const Resources& resources,
       const Docker::Container& container);
 
+  process::Future<Nothing> __update(
+      const ContainerID& containerId,
+      const Resources& resources,
+      pid_t pid);
+
   Future<ResourceStatistics> _usage(
     const ContainerID& containerId,
     const Docker::Container& container);
@@ -301,6 +306,10 @@ private:
     // The docker pull subprocess is stored so we can killtree the
     // pid when destroy is called while docker is pulling the image.
     Option<Subprocess> pull;
+
+    // Once the container is running, this saves the pid of the
+    // running container.
+    Option<pid_t> pid;
   };
 
   hashmap<ContainerID, Container*> containers_;
@@ -1237,8 +1246,16 @@ Future<Nothing> DockerContainerizerProcess::update(
     return Nothing();
   }
 
+  Container* container = containers_[containerId];
+
+  if (container->state == Container::DESTROYING)  {
+    LOG(INFO) << "Ignoring updating container '" << containerId
+              << "' that is being destroyed";
+    return Nothing();
+  }
+
   // Store the resources for usage().
-  containers_[containerId]->resources = _resources;
+  container->resources = _resources;
 
 #ifdef __linux__
   if (!_resources.cpus().isSome() && !_resources.mem().isSome()) {
@@ -1246,6 +1263,11 @@ Future<Nothing> DockerContainerizerProcess::update(
     return Nothing();
   }
 
+  // Skip inspecting the docker container if we already have the pid.
+  if (container->pid.isSome()) {
+    return __update(containerId, _resources, container->pid.get());
+  }
+
   return docker.inspect(containerName(containerId))
     .then(defer(self(), &Self::_update, containerId, _resources, lambda::_1));
 #else
@@ -1259,6 +1281,27 @@ Future<Nothing> DockerContainerizerProcess::_update(
     const Resources& _resources,
     const Docker::Container& container)
 {
+  if (container.pid.isNone()) {
+    return Nothing();
+  }
+
+  if (!containers_.contains(containerId)) {
+    LOG(INFO) << "Container has been removed after docker inspect, "
+              << "skipping update";
+    return Nothing();
+  }
+
+  containers_[containerId]->pid = container.pid.get();
+
+  return __update(containerId, _resources, container.pid.get());
+}
+
+
+Future<Nothing> DockerContainerizerProcess::__update(
+    const ContainerID& containerId,
+    const Resources& _resources,
+    pid_t pid)
+{
 #ifdef __linux__
   // Determine the the cgroups hierarchies where the 'cpu' and
   // 'memory' subsystems are mounted (they may be the same). Note that
@@ -1284,15 +1327,9 @@ Future<Nothing> DockerContainerizerProcess::_update(
   // the hierarchy with the 'memory' subsystem attached so we can
   // update the proper cgroup control files.
 
-  // First check that this container still appears to be running.
-  Option<pid_t> pid = container.pid;
-  if (pid.isNone()) {
-    return Nothing();
-  }
-
   // Determine the cgroup for the 'cpu' subsystem (based on the
   // container's pid).
-  Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
+  Result<string> cpuCgroup = cgroups::cpu::cgroup(pid);
 
   if (cpuCgroup.isError()) {
     return Failure("Failed to determine cgroup for the 'cpu' subsystem: " +
@@ -1325,7 +1362,7 @@ Future<Nothing> DockerContainerizerProcess::_update(
   }
 
   // Now determine the cgroup for the 'memory' subsystem.
-  Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
+  Result<string> memoryCgroup = cgroups::memory::cgroup(pid);
 
   if (memoryCgroup.isError()) {
     return Failure("Failed to determine cgroup for the 'memory' subsystem: " +

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbb85c8/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 3a55f5e..c37bc52 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -734,6 +734,26 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
   EXPECT_EQ(1024u, cpu.get());
   EXPECT_EQ(128u, mem.get().megabytes());
 
+  newResources = Resources::parse("cpus:1;mem:144");
+
+  // Issue second update that uses the cached pid instead of inspect.
+  update = dockerContainerizer.update(containerId.get(), newResources.get());
+
+  AWAIT_READY(update);
+
+  cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
+
+  ASSERT_SOME(cpu);
+
+  mem = cgroups::memory::soft_limit_in_bytes(
+      memoryHierarchy.get(),
+      memoryCgroup.get());
+
+  ASSERT_SOME(mem);
+
+  EXPECT_EQ(1024u, cpu.get());
+  EXPECT_EQ(144u, mem.get().megabytes());
+
   driver.stop();
   driver.join();
 


[3/5] git commit: Added some slave recovery DockerContainerizer tests.

Posted by be...@apache.org.
Added some slave recovery DockerContainerizer tests.

Review: https://reviews.apache.org/r/24765


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd553815
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd553815
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd553815

Branch: refs/heads/master
Commit: fd55381510c96ae7bb48b36cae2bcc329d535382
Parents: 6d50275
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 20:56:17 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Aug 16 08:25:39 2014 -0700

----------------------------------------------------------------------
 src/tests/docker_containerizer_tests.cpp | 364 +++++++++++++++++++++++++-
 1 file changed, 354 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fd553815/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 0d7c3b1..3a55f5e 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -20,14 +20,18 @@
 #include <gtest/gtest.h>
 
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/subprocess.hpp>
 
 #include "linux/cgroups.hpp"
 
+#include "messages/messages.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
 #include "slave/containerizer/docker.hpp"
+#include "slave/paths.hpp"
 #include "slave/slave.hpp"
 #include "slave/state.hpp"
 
@@ -43,7 +47,9 @@ using mesos::internal::slave::Slave;
 using mesos::internal::slave::DockerContainerizer;
 
 using process::Future;
+using process::Message;
 using process::PID;
+using process::UPID;
 
 using std::vector;
 using std::list;
@@ -51,6 +57,7 @@ using std::string;
 
 using testing::_;
 using testing::DoDefault;
+using testing::Eq;
 using testing::Invoke;
 using testing::Return;
 
@@ -189,7 +196,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -299,7 +306,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -390,7 +397,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -618,7 +625,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -855,7 +862,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -962,7 +969,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1057,7 +1064,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
 }
 
 
-// The following test uses a docker image (mesosphere/inky) that has
+// The following test uses a Docker image (mesosphere/inky) that has
 // an entrypoint "echo" and a default command "inky".
 TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 {
@@ -1070,7 +1077,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1170,7 +1177,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 }
 
 
-// The following test uses a docker image (mesosphere/inky) that has
+// The following test uses a Docker image (mesosphere/inky) that has
 // an entrypoint "echo" and a default command "inky".
 TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 {
@@ -1183,7 +1190,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1282,3 +1289,340 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 
   Shutdown();
 }
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Setup recovery slave flags.
+  flags.checkpoint = true;
+  flags.recover = "reconnect";
+  flags.strict = true;
+
+  Docker docker = Docker::create(tests::flags.docker, false).get();
+
+  // We put the containerizer on the heap so we can more easily
+  // control it's lifetime, i.e., when we invoke the destructor.
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launch)));
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+
+  // Stop the slave before the status update is received.
+  AWAIT_READY(statusUpdateMessage);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  // Make sure the container is still running.
+  Future<list<Docker::Container> > containers =
+    docker.ps(true, slave::DOCKER_NAME_PREFIX);
+
+  AWAIT_READY(containers);
+
+  ASSERT_TRUE(exists(containers.get(), containerId.get()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  delete dockerContainerizer2;
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+//
+// TODO(benh): This test is currently disabled because the executor
+// inside the image mesosphere/test-executor does not properly set the
+// executor PID that is uses during registration, so when the new
+// slave recovers it can't reconnect and instead destroys that
+// container. In particular, it uses '0' for it's IP which we properly
+// parse and can even properly use for sending other messages, but the
+// current implementation of 'UPID::operator bool ()' fails if the IP
+// component of a PID is '0'.
+TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Setup recovery slave flags.
+  flags.checkpoint = true;
+  flags.recover = "reconnect";
+  flags.strict = true;
+
+  Docker docker = Docker::create(tests::flags.docker, false).get();
+
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/test-executor");
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<ContainerID> containerId;
+  Future<SlaveID> slaveId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<4>(&slaveId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launchExecutor)));
+
+  // The test-executor in the image immediately sends a TASK_RUNNING
+  // followed by TASK_FINISHED (no sleep/delay in between) so we need
+  // to drop the first TWO updates that come from the executor rather
+  // than only the first update like above where we can control how
+  // the length of the task.
+  Future<StatusUpdateMessage> statusUpdateMessage1 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage2 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+  AWAIT_READY(slaveId);
+
+  // We also need to wait until the container's pid has been been
+  // checkpointed so that when the next slave recovers it won't treat
+  // the executor as having gone lost!
+  string path = slave::paths::getForkedPidPath(
+      slave::paths::getMetaRootDir(flags.work_dir),
+      slaveId.get(),
+      frameworkId.get(),
+      executorId,
+      containerId.get());
+
+  Duration waited = Duration::zero();
+  do {
+    if (os::exists(path)) {
+      Try<string> read = os::read(path);
+      if (read.isSome() && read.get() != "") {
+        break;
+      }
+    }
+    os::sleep(Milliseconds(100));
+    waited += Milliseconds(100);
+  } while (waited < Seconds(3));
+
+  ASSERT_TRUE(os::exists(path));
+  ASSERT_SOME_NE("", os::read(path));
+
+  // Stop the slave before the status update is received.
+  AWAIT_READY(statusUpdateMessage1);
+  AWAIT_READY(statusUpdateMessage2);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  // Make sure the container is still running.
+  Future<list<Docker::Container> > containers =
+    docker.ps(true, slave::DOCKER_NAME_PREFIX);
+
+  AWAIT_READY(containers);
+
+  ASSERT_TRUE(exists(containers.get(), containerId.get()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  delete dockerContainerizer2;
+}


[4/5] git commit: Validate Docker version since we require >= 1.0.0.

Posted by be...@apache.org.
Validate Docker version since we require >= 1.0.0.

Review: https://reviews.apache.org/r/24767


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a7b706ae
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a7b706ae
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a7b706ae

Branch: refs/heads/master
Commit: a7b706ae891709d4915951cb1ac714d436688be1
Parents: 337e955
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 22:03:00 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Aug 16 08:25:39 2014 -0700

----------------------------------------------------------------------
 src/docker/docker.cpp | 39 +++++++++++++++++++++++++++++++++++----
 1 file changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a7b706ae/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index a4bad5b..73c62f9 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -106,13 +106,14 @@ Try<Docker> Docker::create(const string& path, bool validate)
                  "to mount cgroups manually!");
   }
 
-  std::string cmd = path + " info";
+  // Validate the version (and that we can use Docker at all).
+  string cmd = path + " version";
 
   Try<Subprocess> s = subprocess(
       cmd,
       Subprocess::PATH("/dev/null"),
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH("/dev/null"));
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
   if (s.isError()) {
     return Error(s.error());
@@ -134,7 +135,37 @@ Try<Docker> Docker::create(const string& path, bool validate)
     return Error(msg);
   }
 
-  return Docker(path);
+  CHECK_SOME(s.get().out());
+
+  Future<string> output = io::read(s.get().out().get());
+
+  if (!output.await(Seconds(5))) {
+    return Error("Timed out reading output from '" + cmd + "'");
+  } else if (output.isFailed()) {
+    return Error("Failed to read output from '" + cmd + "': " +
+                 output.failure());
+  }
+
+  foreach (string line, strings::split(output.get(), "\n")) {
+    line = strings::trim(line);
+    if (strings::startsWith(line, "Client version: ")) {
+      line = line.substr(strlen("Client version: "));
+      vector<string> version = strings::split(line, ".");
+      if (version.size() < 1) {
+        return Error("Failed to parse Docker version '" + line + "'");
+      }
+      Try<int> major = numify<int>(version[0]);
+      if (major.isError()) {
+        return Error("Failed to parse Docker major version '" +
+                     version[0] + "'");
+      } else if (major.get() < 1) {
+        break;
+      }
+      return Docker(path);
+    }
+  }
+
+  return Error("Insufficient version of Docker! Please upgrade to >= 1.0.0");
 }