You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/30 00:04:22 UTC

[11/13] mesos git commit: Added a container daemon to monitor a long-running standalone container.

Added a container daemon to monitor a long-running standalone container.

The `ContanierDaemon` class is responsible to monitor if a long-running
service running in a standalone container, and restart the service
container after it exits. It takes the following hook functions:

* `postStartHook`: called after the container is launched every time.
* `postStopHook`: called after the container exits every time.

`ContainerDaemon` does not manage the lifecycle of the contanier it
monitors, so the container persists across `ContainerDaemon`s.

Review: https://reviews.apache.org/r/63680/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c979635
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c979635
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c979635

Branch: refs/heads/master
Commit: 5c979635dfc5c20ffb48672396eb5ca4aeb3a2c7
Parents: eb7f16b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Nov 29 15:31:15 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Nov 29 15:31:15 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt             |   1 +
 src/Makefile.am                |   2 +
 src/slave/container_daemon.cpp | 278 ++++++++++++++++++++++++++++++++++++
 src/slave/container_daemon.hpp |  92 ++++++++++++
 4 files changed, 373 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5c979635/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3f5d946..76ef6ca 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -246,6 +246,7 @@ configure_file(
 #####################################
 set(AGENT_SRC
   slave/constants.cpp
+  slave/container_daemon.cpp
   slave/container_logger.cpp
   slave/flags.cpp
   slave/gc.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c979635/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 9438a7e..8dcc367 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1012,6 +1012,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   scheduler/scheduler.cpp						\
   secret/resolver.cpp							\
   slave/constants.cpp							\
+  slave/container_daemon.cpp						\
   slave/container_logger.cpp						\
   slave/flags.cpp							\
   slave/gc.cpp								\
@@ -1159,6 +1160,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   scheduler/constants.hpp						\
   scheduler/flags.hpp							\
   slave/constants.hpp							\
+  slave/container_daemon.hpp						\
   slave/flags.hpp							\
   slave/gc.hpp								\
   slave/gc_process.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c979635/src/slave/container_daemon.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp
new file mode 100644
index 0000000..2e6c748
--- /dev/null
+++ b/src/slave/container_daemon.cpp
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/container_daemon.hpp"
+
+#include <mesos/agent/agent.hpp>
+
+#include <process/defer.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/stringify.hpp>
+#include <stout/unreachable.hpp>
+
+#include "common/http.hpp"
+
+#include "internal/evolve.hpp"
+
+namespace http = process::http;
+
+using std::string;
+
+using mesos::agent::Call;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+using process::Promise;
+
+using process::defer;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Returns the 'Bearer' credential as a header for calling the V1 agent
+// API if the `authToken` is presented, or empty otherwise.
+// TODO(chhsiao): Currently we assume the JWT authenticator is used for
+// the agent operator API.
+static inline http::Headers getAuthHeader(const Option<string>& authToken)
+{
+  http::Headers headers;
+
+  if (authToken.isSome()) {
+    headers["Authorization"] = "Bearer " + authToken.get();
+  }
+
+  return headers;
+}
+
+
+class ContainerDaemonProcess : public Process<ContainerDaemonProcess>
+{
+public:
+  explicit ContainerDaemonProcess(
+      const http::URL& _agentUrl,
+      const Option<string>& _authToken,
+      const ContainerID& containerId,
+      const Option<CommandInfo>& commandInfo,
+      const Option<Resources>& resources,
+      const Option<ContainerInfo>& containerInfo,
+      const Option<std::function<Future<Nothing>()>>& _postStartHook,
+      const Option<std::function<Future<Nothing>()>>& _postStopHook);
+
+  ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
+
+  ContainerDaemonProcess& operator=(
+      const ContainerDaemonProcess& other) = delete;
+
+  Future<Nothing> wait();
+
+protected:
+  void initialize() override;
+
+private:
+  void launchContainer();
+  void waitContainer();
+
+  const http::URL agentUrl;
+  const Option<string> authToken;
+  const ContentType contentType;
+  const Option<std::function<Future<Nothing>()>> postStartHook;
+  const Option<std::function<Future<Nothing>()>> postStopHook;
+
+  Call launchCall;
+  Call waitCall;
+
+  Promise<Nothing> terminated;
+};
+
+
+ContainerDaemonProcess::ContainerDaemonProcess(
+    const http::URL& _agentUrl,
+    const Option<string>& _authToken,
+    const ContainerID& containerId,
+    const Option<CommandInfo>& commandInfo,
+    const Option<Resources>& resources,
+    const Option<ContainerInfo>& containerInfo,
+    const Option<std::function<Future<Nothing>()>>& _postStartHook,
+    const Option<std::function<Future<Nothing>()>>& _postStopHook)
+  : ProcessBase(process::ID::generate("container-daemon")),
+    agentUrl(_agentUrl),
+    authToken(_authToken),
+    contentType(ContentType::PROTOBUF),
+    postStartHook(_postStartHook),
+    postStopHook(_postStopHook)
+{
+  launchCall.set_type(Call::LAUNCH_CONTAINER);
+  launchCall.mutable_launch_container()
+    ->mutable_container_id()->CopyFrom(containerId);
+
+  if (commandInfo.isSome()) {
+    launchCall.mutable_launch_container()
+      ->mutable_command()->CopyFrom(commandInfo.get());
+  }
+
+  if (resources.isSome()) {
+    launchCall.mutable_launch_container()
+      ->mutable_resources()->CopyFrom(resources.get());
+  }
+
+  if (containerInfo.isSome()) {
+    launchCall.mutable_launch_container()
+      ->mutable_container()->CopyFrom(containerInfo.get());
+  }
+
+  waitCall.set_type(Call::WAIT_CONTAINER);
+  waitCall.mutable_wait_container()->mutable_container_id()->CopyFrom(
+      containerId);
+}
+
+
+Future<Nothing> ContainerDaemonProcess::wait()
+{
+  return terminated.future();
+}
+
+
+void ContainerDaemonProcess::initialize()
+{
+  launchContainer();
+}
+
+
+void ContainerDaemonProcess::launchContainer()
+{
+  http::post(
+      agentUrl,
+      getAuthHeader(authToken),
+      serialize(contentType, evolve(launchCall)),
+      stringify(contentType))
+    .then(defer(self(), [=](
+        const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::Accepted().status) {
+        return Failure(
+            "Failed to launch container '" +
+            stringify(launchCall.launch_container().container_id()) +
+            "': Unexpected response '" + response.status + "' (" +
+            response.body + ")");
+      }
+
+      return postStartHook.isSome() ? postStartHook.get()() : Nothing();
+    }))
+    .onReady(defer(self(), &Self::waitContainer))
+    .onFailed(defer(self(), [=](const string& failure) {
+      terminated.fail(failure);
+    }))
+    .onDiscarded(defer(self(), [=] {
+      terminated.discard();
+    }));
+}
+
+
+void ContainerDaemonProcess::waitContainer()
+{
+  http::post(
+      agentUrl,
+      getAuthHeader(authToken),
+      serialize(contentType, evolve(waitCall)),
+      stringify(contentType))
+    .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
+        return Failure(
+            "Failed to wait for container '" +
+            stringify(waitCall.wait_container().container_id()) +
+            "': Unexpected response '" + response.status + "' (" +
+            response.body + ")");
+      }
+
+      return postStopHook.isSome() ? postStopHook.get()() : Nothing();
+    }))
+    .onReady(defer(self(), &Self::launchContainer))
+    .onFailed(defer(self(), [=](const string& failure) {
+      terminated.fail(failure);
+    }))
+    .onDiscarded(defer(self(), [=] {
+      terminated.discard();
+    }));
+}
+
+
+Try<Owned<ContainerDaemon>> ContainerDaemon::create(
+    const http::URL& agentUrl,
+    const Option<string>& authToken,
+    const ContainerID& containerId,
+    const Option<CommandInfo>& commandInfo,
+    const Option<Resources>& resources,
+    const Option<ContainerInfo>& containerInfo,
+    const Option<std::function<Future<Nothing>()>>& postStartHook,
+    const Option<std::function<Future<Nothing>()>>& postStopHook)
+{
+  return Owned<ContainerDaemon>(new ContainerDaemon(
+      agentUrl,
+      authToken,
+      containerId,
+      commandInfo,
+      resources,
+      containerInfo,
+      postStartHook,
+      postStopHook));
+}
+
+
+ContainerDaemon::ContainerDaemon(
+    const http::URL& agentUrl,
+    const Option<string>& authToken,
+    const ContainerID& containerId,
+    const Option<CommandInfo>& commandInfo,
+    const Option<Resources>& resources,
+    const Option<ContainerInfo>& containerInfo,
+    const Option<std::function<Future<Nothing>()>>& postStartHook,
+    const Option<std::function<Future<Nothing>()>>& postStopHook)
+  : process(new ContainerDaemonProcess(
+        agentUrl,
+        authToken,
+        containerId,
+        commandInfo,
+        resources,
+        containerInfo,
+        postStartHook,
+        postStopHook))
+{
+  spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+ContainerDaemon::~ContainerDaemon()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> ContainerDaemon::wait()
+{
+  return process->wait();
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c979635/src/slave/container_daemon.hpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.hpp b/src/slave/container_daemon.hpp
new file mode 100644
index 0000000..a58140d
--- /dev/null
+++ b/src/slave/container_daemon.hpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINER_DAEMON_HPP__
+#define __SLAVE_CONTAINER_DAEMON_HPP__
+
+#include <functional>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declarations.
+class ContainerDaemonProcess;
+
+
+// A daemon that launches and monitors a service running in a standalone
+// container, and automatically restarts the container when it exits.
+//
+// NOTE: The `ContainerDaemon` itself is not responsible to manage the
+// lifecycle of the service container it monitors.
+class ContainerDaemon
+{
+public:
+  // Creates a container daemon that idempotently launches the container
+  // and then run the `postStartHook` function. Upon container exits, it
+  // executes the `postStopHook` function, then restarts the launch
+  // cycle again. Any failed or discarded future returned by the hook
+  // functions will be reflected by the `wait()` method.
+  static Try<process::Owned<ContainerDaemon>> create(
+      const process::http::URL& agentUrl,
+      const Option<std::string>& authToken,
+      const ContainerID& containerId,
+      const Option<CommandInfo>& commandInfo,
+      const Option<Resources>& resources,
+      const Option<ContainerInfo>& containerInfo,
+      const Option<std::function<process::Future<Nothing>()>>& postStartHook =
+        None(),
+      const Option<std::function<process::Future<Nothing>()>>& postStopHook =
+        None());
+
+  ~ContainerDaemon();
+
+  // Returns a future that only reaches a terminal state when a failure
+  // or a discarded future occurs during the launch cycle. This is
+  // intended to capture any loop-breaking error, and the caller should
+  // reconstruct a new daemon instance if they want to retry.
+  process::Future<Nothing> wait();
+
+private:
+  explicit ContainerDaemon(
+      const process::http::URL& agentUrl,
+      const Option<std::string>& authToken,
+      const ContainerID& containerId,
+      const Option<CommandInfo>& commandInfo,
+      const Option<Resources>& resources,
+      const Option<ContainerInfo>& containerInfo,
+      const Option<std::function<process::Future<Nothing>()>>& postStartHook,
+      const Option<std::function<process::Future<Nothing>()>>& postStopHook);
+
+  process::Owned<ContainerDaemonProcess> process;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINER_DAEMON_HPP__