You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/07/26 02:12:45 UTC

[mesos] branch master updated (4b15fbd -> 7e160a3)

This is an automated email from the ASF dual-hosted git repository.

grag pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 4b15fbd  Exposed agent drain information in the webui.
     new a1e4a9a  Moved the Docker executor declaration into a header.
     new 4cbda17  Enabled the Docker executor to accept kill policy overrides.
     new 7e160a3  Added test to verify that Docker executor can override kill policy.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/CMakeLists.txt                                 |   4 +-
 src/Makefile.am                                    |   3 +-
 src/docker/CMakeLists.txt                          |  20 --
 src/docker/executor.cpp                            | 388 +++++++--------------
 src/docker/executor.hpp                            |  58 +++
 src/exec/exec.cpp                                  |  22 +-
 src/internal/evolve.cpp                            |   6 +
 src/internal/evolve.hpp                            |   1 +
 src/launcher/CMakeLists.txt                        |   5 +
 src/launcher/docker_executor.cpp                   | 266 ++++++++++++++
 .../containerizer/docker_containerizer_tests.cpp   | 172 +++++++++
 11 files changed, 651 insertions(+), 294 deletions(-)
 delete mode 100644 src/docker/CMakeLists.txt
 create mode 100644 src/launcher/docker_executor.cpp


[mesos] 02/03: Enabled the Docker executor to accept kill policy overrides.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4cbda17f3667e2b0713a1f1663e50819a076680b
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Thu Jul 25 12:17:43 2019 -0700

    Enabled the Docker executor to accept kill policy overrides.
    
    This adds a new `killTask()` overload to the Docker executor
    and updates the Mesos executor driver to call into that
    overload when the loaded executor is the Docker executor.
    
    This allows the executor driver to pass the kill policy
    override, when present, into the Docker executor.
    
    Review: https://reviews.apache.org/r/71034/
---
 src/docker/executor.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++++----
 src/docker/executor.hpp |  5 +++++
 src/exec/exec.cpp       | 22 ++++++++++++++++++----
 3 files changed, 67 insertions(+), 8 deletions(-)

diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index de8216f..132f42b 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -396,15 +396,31 @@ public:
         defer(self(), &Self::launchHealthCheck, containerName, task));
   }
 
-  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  void killTask(
+      ExecutorDriver* driver,
+      const TaskID& taskId,
+      const Option<KillPolicy>& killPolicyOverride = None())
   {
-    LOG(INFO) << "Received killTask for task " << taskId.value();
+    string overrideMessage = "";
+    if (killPolicyOverride.isSome() && killPolicyOverride->has_grace_period()) {
+      Duration gracePeriodDuration =
+        Nanoseconds(killPolicyOverride->grace_period().nanoseconds());
+
+      overrideMessage =
+        " with grace period override of " + stringify(gracePeriodDuration);
+    }
+
+    LOG(INFO) << "Received killTask" << overrideMessage
+              << " for task " << taskId.value();
 
     // Using shutdown grace period as a default is backwards compatible
     // with the `stop_timeout` flag, deprecated in 1.0.
     Duration gracePeriod = shutdownGracePeriod;
 
-    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+    if (killPolicyOverride.isSome() && killPolicyOverride->has_grace_period()) {
+      gracePeriod =
+        Nanoseconds(killPolicyOverride->grace_period().nanoseconds());
+    } else if (killPolicy.isSome() && killPolicy->has_grace_period()) {
       gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
     }
 
@@ -929,7 +945,12 @@ void DockerExecutor::launchTask(ExecutorDriver* driver, const TaskInfo& task)
 
 void DockerExecutor::killTask(ExecutorDriver* driver, const TaskID& taskId)
 {
-  dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId);
+  // Need to disambiguate overloaded function.
+  void (DockerExecutorProcess::*killTaskMethod)(
+      ExecutorDriver*, const TaskID&, const Option<KillPolicy>&)
+    = &DockerExecutorProcess::killTask;
+
+  process::dispatch(process.get(), killTaskMethod, driver, taskId, None());
 }
 
 
@@ -955,6 +976,25 @@ void DockerExecutor::error(ExecutorDriver* driver, const string& data)
   dispatch(process.get(), &DockerExecutorProcess::error, driver, data);
 }
 
+
+void DockerExecutor::killTask(
+    ExecutorDriver* driver,
+    const TaskID& taskId,
+    const Option<KillPolicy>& killPolicyOverride)
+{
+  // Need to disambiguate overloaded function.
+  void (DockerExecutorProcess::*killTaskMethod)(
+      ExecutorDriver*, const TaskID&, const Option<KillPolicy>&)
+    = &DockerExecutorProcess::killTask;
+
+  process::dispatch(
+      process.get(),
+      killTaskMethod,
+      driver,
+      taskId,
+      killPolicyOverride);
+}
+
 } // namespace docker {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/docker/executor.hpp b/src/docker/executor.hpp
index dfb8ad0..768c2e1 100644
--- a/src/docker/executor.hpp
+++ b/src/docker/executor.hpp
@@ -151,6 +151,11 @@ public:
 
   void error(ExecutorDriver* driver, const std::string& data) override;
 
+  void killTask(
+      ExecutorDriver* driver,
+      const TaskID& taskId,
+      const Option<KillPolicy>& killPolicyOverride);
+
 private:
   process::Owned<DockerExecutorProcess> process;
 };
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index c0fa3b6..67e082e 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -47,6 +47,8 @@
 
 #include "common/protobuf_utils.hpp"
 
+#include "docker/executor.hpp"
+
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
@@ -183,8 +185,7 @@ public:
         &RunTaskMessage::task);
 
     install<KillTaskMessage>(
-        &ExecutorProcess::killTask,
-        &KillTaskMessage::task_id);
+        &ExecutorProcess::killTask);
 
     install<StatusUpdateAcknowledgementMessage>(
         &ExecutorProcess::statusUpdateAcknowledgement,
@@ -339,8 +340,10 @@ protected:
     VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed();
   }
 
-  void killTask(const TaskID& taskId)
+  void killTask(KillTaskMessage&& killTaskMessage)
   {
+    const TaskID taskId = killTaskMessage.task_id();
+
     if (aborted.load()) {
       VLOG(1) << "Ignoring kill task message for task " << taskId
               << " because the driver is aborted!";
@@ -365,7 +368,18 @@ protected:
       stopwatch.start();
     }
 
-    executor->killTask(driver, taskId);
+    // If this is a Docker executor, call the `killTask()` overload which
+    // allows the kill policy to be overridden.
+    auto* dockerExecutor = dynamic_cast<docker::DockerExecutor*>(executor);
+    if (dockerExecutor) {
+      Option<KillPolicy> killPolicy = killTaskMessage.has_kill_policy()
+        ? killTaskMessage.kill_policy()
+        : Option<KillPolicy>::none();
+
+      dockerExecutor->killTask(driver, taskId, killPolicy);
+    } else {
+      executor->killTask(driver, taskId);
+    }
 
     VLOG(1) << "Executor::killTask took " << stopwatch.elapsed();
   }


[mesos] 03/03: Added test to verify that Docker executor can override kill policy.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7e160a36918ad73f79c05cb53a48b7424958e497
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Thu Jul 25 12:17:45 2019 -0700

    Added test to verify that Docker executor can override kill policy.
    
    This adds a test which verifies that when a scheduler attemps to
    override a task's default kill policy, the Docker executor will
    honor that override.
    
    Review: https://reviews.apache.org/r/71035/
---
 src/internal/evolve.cpp                            |   6 +
 src/internal/evolve.hpp                            |   1 +
 .../containerizer/docker_containerizer_tests.cpp   | 172 +++++++++++++++++++++
 3 files changed, 179 insertions(+)

diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 81de15e..c5e4151 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -86,6 +86,12 @@ v1::AgentInfo evolve(const SlaveInfo& slaveInfo)
 }
 
 
+v1::ContainerInfo evolve(const ContainerInfo& containerInfo)
+{
+  return evolve<v1::ContainerInfo>(containerInfo);
+}
+
+
 v1::DomainInfo evolve(const DomainInfo& domainInfo)
 {
   return evolve<v1::DomainInfo>(domainInfo);
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index ffbb342..e4e3ab4 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -62,6 +62,7 @@ namespace internal {
 // Helpers for evolving types between versions. Please add as necessary!
 v1::AgentID evolve(const SlaveID& slaveId);
 v1::AgentInfo evolve(const SlaveInfo& slaveInfo);
+v1::ContainerInfo evolve(const ContainerInfo& containerInfo);
 v1::DomainInfo evolve(const DomainInfo& domainInfo);
 v1::DrainInfo evolve(const DrainInfo& drainInfo);
 v1::ExecutorID evolve(const ExecutorID& executorId);
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index a621758..3d932a5 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -20,6 +20,8 @@
 
 #include <mesos/slave/container_logger.hpp>
 
+#include <mesos/v1/mesos.hpp>
+
 #include <process/collect.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -5240,6 +5242,176 @@ TEST_F(HungDockerTest, ROOT_DOCKER_InspectHungDuringPull)
   driver.join();
 }
 
+
+// This test is disabled on windows due to the bash-specific
+// command used in the task below.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    DockerContainerizerTest, ROOT_DOCKER_OverrideKillPolicy)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker =
+    new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Fetcher fetcher(flags);
+
+  Try<ContainerLogger*> logger =
+    ContainerLogger::create(flags.container_logger);
+
+  ASSERT_SOME(logger);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  MockDockerContainerizer dockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> parsed =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32");
+
+  ASSERT_SOME(parsed);
+
+  v1::Resources resources = parsed.get();
+
+  // Create a task which ignores SIGTERM so that we can detect
+  // when the task receives SIGKILL.
+  v1::TaskInfo taskInfo = v1::createTask(
+      agentId,
+      resources,
+      "trap \"echo 'SIGTERM received'\" SIGTERM; sleep 999999");
+
+  // TODO(tnachen): Use local image to test if possible.
+  taskInfo.mutable_container()->CopyFrom(
+      evolve(createDockerInfo(DOCKER_TEST_IMAGE)));
+
+  {
+    // Set a long grace period on the task's kill policy so that we
+    // can detect if the override is effective.
+    mesos::v1::DurationInfo gracePeriod;
+    gracePeriod.set_nanoseconds(Minutes(10).ns());
+
+    mesos::v1::KillPolicy killPolicy;
+    killPolicy.mutable_grace_period()->CopyFrom(gracePeriod);
+
+    taskInfo.mutable_kill_policy()->CopyFrom(killPolicy);
+  }
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(startingUpdate, Seconds(60));
+  EXPECT_EQ(v1::TASK_STARTING, startingUpdate->status().state());
+  AWAIT_READY_FOR(runningUpdate, Seconds(60));
+  EXPECT_EQ(v1::TASK_RUNNING, runningUpdate->status().state());
+
+  ASSERT_TRUE(
+    exists(docker, containerId.get(), ContainerState::RUNNING));
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  Future<Option<ContainerTermination>> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  {
+    // Set a short grace period on the kill call so that we
+    // can detect if the override is effective.
+    mesos::v1::DurationInfo gracePeriod;
+    gracePeriod.set_nanoseconds(100);
+
+    mesos::v1::KillPolicy killPolicy;
+    killPolicy.mutable_grace_period()->CopyFrom(gracePeriod);
+
+    mesos.send(
+        v1::createCallKill(
+            frameworkId,
+            taskInfo.task_id(),
+            agentId,
+            killPolicy));
+  }
+
+  AWAIT_READY(killedUpdate);
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+
+  AWAIT_READY(termination);
+  EXPECT_SOME(termination.get());
+
+  // Even though the task is killed, the executor should exit gracefully.
+  ASSERT_TRUE(termination.get()->has_status());
+  EXPECT_EQ(0, termination.get()->status());
+
+  ASSERT_FALSE(
+      exists(docker, containerId.get(), ContainerState::RUNNING, false));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[mesos] 01/03: Moved the Docker executor declaration into a header.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a1e4a9aa1d6f2dee9fd56432122c2fa6a35edb77
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Thu Jul 25 12:17:41 2019 -0700

    Moved the Docker executor declaration into a header.
    
    This moves the declaration of the Docker executor into the
    Docker executor header file and moves the code for the Docker
    executor binary into a new launcher implementation file.
    
    This change will enable the Mesos executor driver
    implementation to make use of the `DockerExecutor` symbol.
    
    Review: https://reviews.apache.org/r/71033/
---
 src/CMakeLists.txt               |   4 +-
 src/Makefile.am                  |   3 +-
 src/docker/CMakeLists.txt        |  20 ---
 src/docker/executor.cpp          | 348 +++++++++------------------------------
 src/docker/executor.hpp          |  53 ++++++
 src/launcher/CMakeLists.txt      |   5 +
 src/launcher/docker_executor.cpp | 266 ++++++++++++++++++++++++++++++
 7 files changed, 409 insertions(+), 290 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c455ed6..218a75e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -263,7 +263,8 @@ set(CSI_SRC
   csi/volume_manager.cpp)
 
 set(DOCKER_SRC
-  docker/docker.cpp)
+  docker/docker.cpp
+  docker/executor.cpp)
 
 if (NOT WIN32)
   list(APPEND DOCKER_SRC
@@ -644,7 +645,6 @@ endif ()
 ##############################
 add_subdirectory(checks)
 add_subdirectory(cli)
-add_subdirectory(docker)
 add_subdirectory(examples)
 add_subdirectory(launcher)
 add_subdirectory(local)
diff --git a/src/Makefile.am b/src/Makefile.am
index 46c66f1..697ab10 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1092,6 +1092,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   docker/docker.cpp							\
   docker/docker.hpp							\
   docker/executor.hpp							\
+  docker/executor.cpp							\
   docker/spec.cpp							\
   examples/flags.hpp							\
   examples/test_anonymous_module.hpp					\
@@ -1818,7 +1819,7 @@ mesos_usage_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_usage_LDADD = libmesos.la $(LDADD)
 
 pkglibexec_PROGRAMS += mesos-docker-executor
-mesos_docker_executor_SOURCES = docker/executor.cpp
+mesos_docker_executor_SOURCES = launcher/docker_executor.cpp
 mesos_docker_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_docker_executor_LDADD = libmesos.la $(LDADD)
 
diff --git a/src/docker/CMakeLists.txt b/src/docker/CMakeLists.txt
deleted file mode 100644
index 1196664..0000000
--- a/src/docker/CMakeLists.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# THE DOCKER EXECUTOR EXECUTABLE.
-#################################
-add_executable(mesos-docker-executor executor.cpp)
-target_link_libraries(mesos-docker-executor PRIVATE mesos)
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index f638e4b..de8216f 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -856,291 +856,105 @@ private:
 };
 
 
-class DockerExecutor : public Executor
+DockerExecutor::DockerExecutor(
+    const Owned<Docker>& docker,
+    const string& container,
+    const string& sandboxDirectory,
+    const string& mappedDirectory,
+    const Duration& shutdownGracePeriod,
+    const string& launcherDir,
+    const map<string, string>& taskEnvironment,
+    const Option<ContainerDNSInfo>& defaultContainerDNS,
+    bool cgroupsEnableCfs)
 {
-public:
-  DockerExecutor(
-      const Owned<Docker>& docker,
-      const string& container,
-      const string& sandboxDirectory,
-      const string& mappedDirectory,
-      const Duration& shutdownGracePeriod,
-      const string& launcherDir,
-      const map<string, string>& taskEnvironment,
-      const Option<ContainerDNSInfo>& defaultContainerDNS,
-      bool cgroupsEnableCfs)
-  {
-    process = Owned<DockerExecutorProcess>(new DockerExecutorProcess(
-        docker,
-        container,
-        sandboxDirectory,
-        mappedDirectory,
-        shutdownGracePeriod,
-        launcherDir,
-        taskEnvironment,
-        defaultContainerDNS,
-        cgroupsEnableCfs));
-
-    spawn(process.get());
-  }
-
-  ~DockerExecutor() override
-  {
-    terminate(process.get());
-    wait(process.get());
-  }
-
-  void registered(
-      ExecutorDriver* driver,
-      const ExecutorInfo& executorInfo,
-      const FrameworkInfo& frameworkInfo,
-      const SlaveInfo& slaveInfo) override
-  {
-    dispatch(process.get(),
-             &DockerExecutorProcess::registered,
-             driver,
-             executorInfo,
-             frameworkInfo,
-             slaveInfo);
-  }
-
-  void reregistered(
-      ExecutorDriver* driver,
-      const SlaveInfo& slaveInfo) override
-  {
-    dispatch(process.get(),
-             &DockerExecutorProcess::reregistered,
-             driver,
-             slaveInfo);
-  }
-
-  void disconnected(ExecutorDriver* driver) override
-  {
-    dispatch(process.get(), &DockerExecutorProcess::disconnected, driver);
-  }
-
-  void launchTask(ExecutorDriver* driver, const TaskInfo& task) override
-  {
-    dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
-  }
-
-  void killTask(ExecutorDriver* driver, const TaskID& taskId) override
-  {
-    dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId);
-  }
-
-  void frameworkMessage(ExecutorDriver* driver, const string& data) override
-  {
-    dispatch(process.get(),
-             &DockerExecutorProcess::frameworkMessage,
-             driver,
-             data);
-  }
-
-  void shutdown(ExecutorDriver* driver) override
-  {
-    dispatch(process.get(), &DockerExecutorProcess::shutdown, driver);
-  }
-
-  void error(ExecutorDriver* driver, const string& data) override
-  {
-    dispatch(process.get(), &DockerExecutorProcess::error, driver, data);
-  }
-
-private:
-  Owned<DockerExecutorProcess> process;
-};
-
-
-} // namespace docker {
-} // namespace internal {
-} // namespace mesos {
+  process = Owned<DockerExecutorProcess>(new DockerExecutorProcess(
+      docker,
+      container,
+      sandboxDirectory,
+      mappedDirectory,
+      shutdownGracePeriod,
+      launcherDir,
+      taskEnvironment,
+      defaultContainerDNS,
+      cgroupsEnableCfs));
+
+  spawn(process.get());
+}
 
 
-int main(int argc, char** argv)
+DockerExecutor::~DockerExecutor()
 {
-  GOOGLE_PROTOBUF_VERIFY_VERSION;
-
-#ifdef __WINDOWS__
-  // We need a handle to the job object which this container is associated with.
-  // Without this handle, the job object would be destroyed by the OS when the
-  // agent exits (or crashes), making recovery impossible. By holding a handle,
-  // we tie the lifetime of the job object to the container itself. In this way,
-  // a recovering agent can reattach to the container by opening a new handle to
-  // the job object.
-  const pid_t pid = ::GetCurrentProcessId();
-  const Try<std::wstring> name = os::name_job(pid);
-  if (name.isError()) {
-    cerr << "Failed to create job object name from pid: " << name.error()
-         << endl;
-    return EXIT_FAILURE;
-  }
-
-  // NOTE: This handle will not be destructed, even though it is a
-  // `SharedHandle`, because it will (purposefully) never go out of scope.
-  Try<SharedHandle> handle = os::open_job(JOB_OBJECT_QUERY, false, name.get());
-  if (handle.isError()) {
-    cerr << "Failed to open job object '" << stringify(name.get())
-         << "' for the current container: " << handle.error() << endl;
-    return EXIT_FAILURE;
-  }
-#endif // __WINDOWS__
-
-  mesos::internal::docker::Flags flags;
-
-  // Load flags from environment and command line.
-  Try<flags::Warnings> load = flags.load(None(), &argc, &argv);
-
-  if (flags.help) {
-    cout << flags.usage() << endl;
-    return EXIT_SUCCESS;
-  }
-
-  if (load.isError()) {
-    cerr << flags.usage(load.error()) << endl;
-    return EXIT_FAILURE;
-  }
-
-  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
-
-  // Log any flag warnings (after logging is initialized).
-  foreach (const flags::Warning& warning, load->warnings) {
-    LOG(WARNING) << warning.message;
-  }
-
-  VLOG(1) << stringify(flags);
-
-  if (flags.docker.isNone()) {
-    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --docker");
-  }
-
-  if (flags.container.isNone()) {
-    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --container");
-  }
+  terminate(process.get());
+  wait(process.get());
+}
 
-  if (flags.sandbox_directory.isNone()) {
-    EXIT(EXIT_FAILURE)
-      << flags.usage("Missing required option --sandbox_directory");
-  }
 
-  if (flags.mapped_directory.isNone()) {
-    EXIT(EXIT_FAILURE)
-      << flags.usage("Missing required option --mapped_directory");
-  }
+void DockerExecutor::registered(
+    ExecutorDriver* driver,
+    const ExecutorInfo& executorInfo,
+    const FrameworkInfo& frameworkInfo,
+    const SlaveInfo& slaveInfo)
+{
+  dispatch(process.get(),
+           &DockerExecutorProcess::registered,
+           driver,
+           executorInfo,
+           frameworkInfo,
+           slaveInfo);
+}
 
-  map<string, string> taskEnvironment;
-  if (flags.task_environment.isSome()) {
-    // Parse the string as JSON.
-    Try<JSON::Object> json =
-      JSON::parse<JSON::Object>(flags.task_environment.get());
-
-    if (json.isError()) {
-      EXIT(EXIT_FAILURE)
-        << flags.usage("Failed to parse --task_environment: " + json.error());
-    }
 
-    // Convert from JSON to map.
-    foreachpair (
-        const string& key,
-        const JSON::Value& value,
-        json->values) {
-      if (!value.is<JSON::String>()) {
-        EXIT(EXIT_FAILURE) << flags.usage(
-            "Value of key '" + key + "' in --task_environment is not a string");
-      }
+void DockerExecutor::reregistered(
+    ExecutorDriver* driver,
+    const SlaveInfo& slaveInfo)
+{
+  dispatch(process.get(),
+           &DockerExecutorProcess::reregistered,
+           driver,
+           slaveInfo);
+}
 
-      // Save the parsed and validated key/value.
-      taskEnvironment[key] = value.as<JSON::String>().value;
-    }
-  }
 
-  Option<mesos::internal::ContainerDNSInfo> defaultContainerDNS;
-  if (flags.default_container_dns.isSome()) {
-    Try<mesos::internal::ContainerDNSInfo> parse =
-      flags::parse<mesos::internal::ContainerDNSInfo>(
-          flags.default_container_dns.get());
+void DockerExecutor::disconnected(ExecutorDriver* driver)
+{
+  dispatch(process.get(), &DockerExecutorProcess::disconnected, driver);
+}
 
-    if (parse.isError()) {
-      EXIT(EXIT_FAILURE) << flags.usage(
-          "Failed to parse --default_container_dns: " + parse.error());
-    }
 
-    defaultContainerDNS = parse.get();
-  }
+void DockerExecutor::launchTask(ExecutorDriver* driver, const TaskInfo& task)
+{
+  dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
+}
 
-  // Get executor shutdown grace period from the environment.
-  //
-  // NOTE: We avoided introducing a docker executor flag for this
-  // because the docker executor exits if it sees an unknown flag.
-  // This makes it difficult to add or remove docker executor flags
-  // that are unconditionally set by the agent.
-  Duration shutdownGracePeriod =
-    mesos::internal::slave::DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
-  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
-  if (value.isSome()) {
-    Try<Duration> parse = Duration::parse(value.get());
-    if (parse.isError()) {
-      EXIT(EXIT_FAILURE)
-        << "Failed to parse value '" << value.get() << "'"
-        << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
-    }
 
-    shutdownGracePeriod = parse.get();
-  }
+void DockerExecutor::killTask(ExecutorDriver* driver, const TaskID& taskId)
+{
+  dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId);
+}
 
-  // If the deprecated flag is set, respect it and choose the bigger value.
-  //
-  // TODO(alexr): Remove this after the deprecation cycle (started in 1.0).
-  if (flags.stop_timeout.isSome() &&
-      flags.stop_timeout.get() > shutdownGracePeriod) {
-    shutdownGracePeriod = flags.stop_timeout.get();
-  }
 
-  if (flags.launcher_dir.isNone()) {
-    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --launcher_dir");
-  }
+void DockerExecutor::frameworkMessage(
+    ExecutorDriver* driver,
+    const string& data)
+{
+  dispatch(process.get(),
+           &DockerExecutorProcess::frameworkMessage,
+           driver,
+           data);
+}
 
-  process::initialize();
 
-  // The 2nd argument for docker create is set to false so we skip
-  // validation when creating a docker abstraction, as the slave
-  // should have already validated docker.
-  Try<Owned<Docker>> docker = Docker::create(
-      flags.docker.get(),
-      flags.docker_socket.get(),
-      false);
+void DockerExecutor::shutdown(ExecutorDriver* driver)
+{
+  dispatch(process.get(), &DockerExecutorProcess::shutdown, driver);
+}
 
-  if (docker.isError()) {
-    EXIT(EXIT_FAILURE)
-      << "Unable to create docker abstraction: " << docker.error();
-  }
 
-  Owned<mesos::internal::docker::DockerExecutor> executor(
-      new mesos::internal::docker::DockerExecutor(
-          docker.get(),
-          flags.container.get(),
-          flags.sandbox_directory.get(),
-          flags.mapped_directory.get(),
-          shutdownGracePeriod,
-          flags.launcher_dir.get(),
-          taskEnvironment,
-          defaultContainerDNS,
-          flags.cgroups_enable_cfs));
-
-  Owned<mesos::MesosExecutorDriver> driver(
-      new mesos::MesosExecutorDriver(executor.get()));
-
-  bool success = driver->run() == mesos::DRIVER_STOPPED;
-
-  // NOTE: We need to delete the executor and driver before we call
-  // `process::finalize` because the executor/driver will try to terminate
-  // and wait on a libprocess actor in their destructor.
-  driver.reset();
-  executor.reset();
-
-  // NOTE: We need to finalize libprocess, on Windows especially,
-  // as any binary that uses the networking stack on Windows must
-  // also clean up the networking stack before exiting.
-  process::finalize(true);
-  return success ? EXIT_SUCCESS : EXIT_FAILURE;
+void DockerExecutor::error(ExecutorDriver* driver, const string& data)
+{
+  dispatch(process.get(), &DockerExecutorProcess::error, driver, data);
 }
+
+} // namespace docker {
+} // namespace internal {
+} // namespace mesos {
diff --git a/src/docker/executor.hpp b/src/docker/executor.hpp
index f21e84c..dfb8ad0 100644
--- a/src/docker/executor.hpp
+++ b/src/docker/executor.hpp
@@ -22,10 +22,15 @@
 #include <map>
 #include <string>
 
+#include <mesos/executor.hpp>
+
+#include <process/owned.hpp>
 #include <process/process.hpp>
 
 #include <stout/option.hpp>
 
+#include "docker/docker.hpp"
+
 #include "logging/flags.hpp"
 
 namespace mesos {
@@ -102,6 +107,54 @@ struct Flags : public virtual mesos::internal::logging::Flags
   Option<Duration> stop_timeout;
 };
 
+
+class DockerExecutorProcess;
+
+
+class DockerExecutor : public Executor
+{
+public:
+  DockerExecutor(
+      const process::Owned<Docker>& docker,
+      const std::string& container,
+      const std::string& sandboxDirectory,
+      const std::string& mappedDirectory,
+      const Duration& shutdownGracePeriod,
+      const std::string& launcherDir,
+      const std::map<std::string, std::string>& taskEnvironment,
+      const Option<ContainerDNSInfo>& defaultContainerDNS,
+      bool cgroupsEnableCfs);
+
+  ~DockerExecutor() override;
+
+  void registered(
+      ExecutorDriver* driver,
+      const ExecutorInfo& executorInfo,
+      const FrameworkInfo& frameworkInfo,
+      const SlaveInfo& slaveInfo) override;
+
+  void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo) override;
+
+  void disconnected(ExecutorDriver* driver) override;
+
+  void launchTask(ExecutorDriver* driver, const TaskInfo& task) override;
+
+  void killTask(ExecutorDriver* driver, const TaskID& taskId) override;
+
+  void frameworkMessage(
+      ExecutorDriver* driver,
+      const std::string& data) override;
+
+  void shutdown(ExecutorDriver* driver) override;
+
+  void error(ExecutorDriver* driver, const std::string& data) override;
+
+private:
+  process::Owned<DockerExecutorProcess> process;
+};
+
 } // namespace docker {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/launcher/CMakeLists.txt b/src/launcher/CMakeLists.txt
index 2ffa946..73587f4 100644
--- a/src/launcher/CMakeLists.txt
+++ b/src/launcher/CMakeLists.txt
@@ -24,3 +24,8 @@ target_link_libraries(mesos-executor PRIVATE mesos)
 
 add_executable(mesos-fetcher fetcher.cpp)
 target_link_libraries(mesos-fetcher PRIVATE mesos)
+
+# THE DOCKER EXECUTOR EXECUTABLE.
+#################################
+add_executable(mesos-docker-executor docker_executor.cpp)
+target_link_libraries(mesos-docker-executor PRIVATE mesos)
diff --git a/src/launcher/docker_executor.cpp b/src/launcher/docker_executor.cpp
new file mode 100644
index 0000000..3f12c78
--- /dev/null
+++ b/src/launcher/docker_executor.cpp
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <stdio.h>
+
+#include <map>
+#include <string>
+
+#include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
+
+#include <process/collect.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/loop.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/error.hpp>
+#include <stout/flags.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+#ifdef __WINDOWS__
+#include <stout/windows.hpp>
+#endif // __WINDOWS__
+
+#include <stout/os/killtree.hpp>
+
+#ifdef __WINDOWS__
+#include <stout/os/windows/jobobject.hpp>
+#endif // __WINDOWS__
+
+#include "checks/checks_runtime.hpp"
+#include "checks/health_checker.hpp"
+
+#include "common/protobuf_utils.hpp"
+#include "common/status_utils.hpp"
+
+#include "docker/docker.hpp"
+#include "docker/executor.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+#include "messages/flags.hpp"
+#include "messages/messages.hpp"
+
+#include "slave/constants.hpp"
+
+using namespace mesos;
+using namespace process;
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::map;
+using std::string;
+using std::vector;
+
+
+int main(int argc, char** argv)
+{
+  GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+#ifdef __WINDOWS__
+  // We need a handle to the job object which this container is associated with.
+  // Without this handle, the job object would be destroyed by the OS when the
+  // agent exits (or crashes), making recovery impossible. By holding a handle,
+  // we tie the lifetime of the job object to the container itself. In this way,
+  // a recovering agent can reattach to the container by opening a new handle to
+  // the job object.
+  const pid_t pid = ::GetCurrentProcessId();
+  const Try<std::wstring> name = os::name_job(pid);
+  if (name.isError()) {
+    cerr << "Failed to create job object name from pid: " << name.error()
+         << endl;
+    return EXIT_FAILURE;
+  }
+
+  // NOTE: This handle will not be destructed, even though it is a
+  // `SharedHandle`, because it will (purposefully) never go out of scope.
+  Try<SharedHandle> handle = os::open_job(JOB_OBJECT_QUERY, false, name.get());
+  if (handle.isError()) {
+    cerr << "Failed to open job object '" << stringify(name.get())
+         << "' for the current container: " << handle.error() << endl;
+    return EXIT_FAILURE;
+  }
+#endif // __WINDOWS__
+
+  mesos::internal::docker::Flags flags;
+
+  // Load flags from environment and command line.
+  Try<flags::Warnings> load = flags.load(None(), &argc, &argv);
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
+
+  // Log any flag warnings (after logging is initialized).
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  VLOG(1) << stringify(flags);
+
+  if (flags.docker.isNone()) {
+    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --docker");
+  }
+
+  if (flags.container.isNone()) {
+    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --container");
+  }
+
+  if (flags.sandbox_directory.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << flags.usage("Missing required option --sandbox_directory");
+  }
+
+  if (flags.mapped_directory.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << flags.usage("Missing required option --mapped_directory");
+  }
+
+  map<string, string> taskEnvironment;
+  if (flags.task_environment.isSome()) {
+    // Parse the string as JSON.
+    Try<JSON::Object> json =
+      JSON::parse<JSON::Object>(flags.task_environment.get());
+
+    if (json.isError()) {
+      EXIT(EXIT_FAILURE)
+        << flags.usage("Failed to parse --task_environment: " + json.error());
+    }
+
+    // Convert from JSON to map.
+    foreachpair (
+        const string& key,
+        const JSON::Value& value,
+        json->values) {
+      if (!value.is<JSON::String>()) {
+        EXIT(EXIT_FAILURE) << flags.usage(
+            "Value of key '" + key + "' in --task_environment is not a string");
+      }
+
+      // Save the parsed and validated key/value.
+      taskEnvironment[key] = value.as<JSON::String>().value;
+    }
+  }
+
+  Option<mesos::internal::ContainerDNSInfo> defaultContainerDNS;
+  if (flags.default_container_dns.isSome()) {
+    Try<mesos::internal::ContainerDNSInfo> parse =
+      flags::parse<mesos::internal::ContainerDNSInfo>(
+          flags.default_container_dns.get());
+
+    if (parse.isError()) {
+      EXIT(EXIT_FAILURE) << flags.usage(
+          "Failed to parse --default_container_dns: " + parse.error());
+    }
+
+    defaultContainerDNS = parse.get();
+  }
+
+  // Get executor shutdown grace period from the environment.
+  //
+  // NOTE: We avoided introducing a docker executor flag for this
+  // because the docker executor exits if it sees an unknown flag.
+  // This makes it difficult to add or remove docker executor flags
+  // that are unconditionally set by the agent.
+  Duration shutdownGracePeriod =
+    mesos::internal::slave::DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
+  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  if (value.isSome()) {
+    Try<Duration> parse = Duration::parse(value.get());
+    if (parse.isError()) {
+      EXIT(EXIT_FAILURE)
+        << "Failed to parse value '" << value.get() << "'"
+        << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
+    }
+
+    shutdownGracePeriod = parse.get();
+  }
+
+  // If the deprecated flag is set, respect it and choose the bigger value.
+  //
+  // TODO(alexr): Remove this after the deprecation cycle (started in 1.0).
+  if (flags.stop_timeout.isSome() &&
+      flags.stop_timeout.get() > shutdownGracePeriod) {
+    shutdownGracePeriod = flags.stop_timeout.get();
+  }
+
+  if (flags.launcher_dir.isNone()) {
+    EXIT(EXIT_FAILURE) << flags.usage("Missing required option --launcher_dir");
+  }
+
+  process::initialize();
+
+  // The 2nd argument for docker create is set to false so we skip
+  // validation when creating a docker abstraction, as the slave
+  // should have already validated docker.
+  Try<Owned<Docker>> docker = Docker::create(
+      flags.docker.get(),
+      flags.docker_socket.get(),
+      false);
+
+  if (docker.isError()) {
+    EXIT(EXIT_FAILURE)
+      << "Unable to create docker abstraction: " << docker.error();
+  }
+
+  Owned<mesos::internal::docker::DockerExecutor> executor(
+      new mesos::internal::docker::DockerExecutor(
+          docker.get(),
+          flags.container.get(),
+          flags.sandbox_directory.get(),
+          flags.mapped_directory.get(),
+          shutdownGracePeriod,
+          flags.launcher_dir.get(),
+          taskEnvironment,
+          defaultContainerDNS,
+          flags.cgroups_enable_cfs));
+
+  Owned<mesos::MesosExecutorDriver> driver(
+      new mesos::MesosExecutorDriver(executor.get()));
+
+  bool success = driver->run() == mesos::DRIVER_STOPPED;
+
+  // NOTE: We need to delete the executor and driver before we call
+  // `process::finalize` because the executor/driver will try to terminate
+  // and wait on a libprocess actor in their destructor.
+  driver.reset();
+  executor.reset();
+
+  // NOTE: We need to finalize libprocess, on Windows especially,
+  // as any binary that uses the networking stack on Windows must
+  // also clean up the networking stack before exiting.
+  process::finalize(true);
+  return success ? EXIT_SUCCESS : EXIT_FAILURE;
+}