You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/02/10 18:12:14 UTC

[3/8] mesos git commit: Libprocess: Added 'parent_hooks' as an argument to 'subprocess'.

Libprocess: Added 'parent_hooks' as an argument to 'subprocess'.

In the future we will remove the default argument for `parent_hooks`
to force the caller to think about what kind of subprocess they want
to launch.

Review: https://reviews.apache.org/r/43303


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2cf7fc8e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2cf7fc8e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2cf7fc8e

Branch: refs/heads/master
Commit: 2cf7fc8e2b3d64c850a7a1576ad2f59b32e2fefb
Parents: 2ed70dc
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Feb 4 15:01:12 2016 -0500
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Wed Feb 10 18:12:05 2016 +0100

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   |  28 ++++-
 3rdparty/libprocess/src/subprocess.cpp          | 103 ++++++++++++++++++-
 2 files changed, 123 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2cf7fc8e/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index c12d1c5..e0c306a 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -46,6 +46,9 @@ namespace process {
 class Subprocess
 {
 public:
+  // Forward declarations.
+  struct Hook;
+
   /**
    * Describes how the I/O is redirected for stdin/stdout/stderr.
    * One of the following three modes are supported:
@@ -123,7 +126,8 @@ public:
         const Option<std::map<std::string, std::string>>& environment,
         const Option<lambda::function<int()>>& setup,
         const Option<lambda::function<
-            pid_t(const lambda::function<int()>&)>>& clone);
+            pid_t(const lambda::function<int()>&)>>& clone,
+        const std::vector<Subprocess::Hook>& parent_hooks);
 
     IO(const lambda::function<Try<InputFileDescriptors>()>& _input,
        const lambda::function<Try<OutputFileDescriptors>()>& _output)
@@ -223,7 +227,8 @@ private:
       const Option<std::map<std::string, std::string>>& environment,
       const Option<lambda::function<int()>>& setup,
       const Option<lambda::function<
-          pid_t(const lambda::function<int()>&)>>& clone);
+          pid_t(const lambda::function<int()>&)>>& clone,
+      const std::vector<Subprocess::Hook>& parent_hooks);
 
   struct Data
   {
@@ -277,8 +282,12 @@ private:
  *     async unsafe code in the body of this function.
  * @param clone Function to be invoked in order to fork/clone the
  *     subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ *     before the child execs.
  * @return The subprocess or an error if one occured.
  */
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
 Try<Subprocess> subprocess(
     const std::string& path,
     std::vector<std::string> argv,
@@ -289,7 +298,9 @@ Try<Subprocess> subprocess(
     const Option<std::map<std::string, std::string>>& environment = None(),
     const Option<lambda::function<int()>>& setup = None(),
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& clone = None());
+        pid_t(const lambda::function<int()>&)>>& clone = None(),
+    const std::vector<Subprocess::Hook>& parent_hooks =
+      Subprocess::Hook::None());
 
 
 /**
@@ -311,8 +322,12 @@ Try<Subprocess> subprocess(
  *     async unsafe code in the body of this function.
  * @param clone Function to be invoked in order to fork/clone the
  *     subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ *     before the child execs.
  * @return The subprocess or an error if one occured.
  */
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
 inline Try<Subprocess> subprocess(
     const std::string& command,
     const Subprocess::IO& in = Subprocess::FD(STDIN_FILENO),
@@ -321,7 +336,9 @@ inline Try<Subprocess> subprocess(
     const Option<std::map<std::string, std::string>>& environment = None(),
     const Option<lambda::function<int()>>& setup = None(),
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& clone = None())
+        pid_t(const lambda::function<int()>&)>>& clone = None(),
+    const std::vector<Subprocess::Hook>& parent_hooks =
+      Subprocess::Hook::None())
 {
   std::vector<std::string> argv = {"sh", "-c", command};
 
@@ -334,7 +351,8 @@ inline Try<Subprocess> subprocess(
       None(),
       environment,
       setup,
-      clone);
+      clone,
+      parent_hooks);
 }
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2cf7fc8e/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 581fba6..44ca6d0 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -247,7 +247,9 @@ static int childMain(
     const Option<lambda::function<int()>>& setup,
     const InputFileDescriptors& stdinfds,
     const OutputFileDescriptors& stdoutfds,
-    const OutputFileDescriptors& stderrfds)
+    const OutputFileDescriptors& stderrfds,
+    bool blocking,
+    int pipes[2])
 {
   // Close parent's end of the pipes.
   if (stdinfds.write.isSome()) {
@@ -260,6 +262,12 @@ static int childMain(
     ::close(stderrfds.read.get());
   }
 
+  // Currently we will block the child's execution of the new process
+  // until all the parent hooks (if any) have executed.
+  if (blocking) {
+    ::close(pipes[1]);
+  }
+
   // Redirect I/O for stdin/stdout/stderr.
   while (::dup2(stdinfds.read, STDIN_FILENO) == -1 && errno == EINTR);
   while (::dup2(stdoutfds.write, STDOUT_FILENO) == -1 && errno == EINTR);
@@ -286,6 +294,22 @@ static int childMain(
     ::close(stderrfds.write);
   }
 
+  if (blocking) {
+    // Do a blocking read on the pipe until the parent signals us to
+    // continue.
+    char dummy;
+    ssize_t length;
+    while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+          errno == EINTR);
+
+    if (length != sizeof(dummy)) {
+      ABORT("Failed to synchronize with parent");
+    }
+
+    // Now close the pipe as we don't need it anymore.
+    ::close(pipes[0]);
+  }
+
   if (setup.isSome()) {
     int status = setup.get()();
     if (status != 0) {
@@ -309,7 +333,8 @@ Try<Subprocess> subprocess(
     const Option<map<string, string>>& environment,
     const Option<lambda::function<int()>>& setup,
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& _clone)
+        pid_t(const lambda::function<int()>&)>>& _clone,
+    const std::vector<Subprocess::Hook>& parent_hooks)
 {
   // File descriptors for redirecting stdin/stdout/stderr.
   // These file descriptors are used for different purposes depending
@@ -397,6 +422,17 @@ Try<Subprocess> subprocess(
   lambda::function<pid_t(const lambda::function<int()>&)> clone =
     (_clone.isSome() ? _clone.get() : defaultClone);
 
+  // Currently we will block the child's execution of the new process
+  // until all the `parent_hooks` (if any) have executed.
+  int pipes[2];
+  const bool blocking = !parent_hooks.empty();
+
+  if (blocking) {
+    // We assume this should not fail under reasonable conditions so we
+    // use CHECK.
+    CHECK_EQ(0, ::pipe(pipes));
+  }
+
   // Now, clone the child process.
   pid_t pid = clone(lambda::bind(
       &childMain,
@@ -406,7 +442,9 @@ Try<Subprocess> subprocess(
       setup,
       stdinfds,
       stdoutfds,
-      stderrfds));
+      stderrfds,
+      blocking,
+      pipes));
 
   delete[] _argv;
 
@@ -421,9 +459,68 @@ Try<Subprocess> subprocess(
     // Save the errno as 'close' below might overwrite it.
     ErrnoError error("Failed to clone");
     internal::close(stdinfds, stdoutfds, stderrfds);
+
+    if (blocking) {
+      os::close(pipes[0]);
+      os::close(pipes[1]);
+    }
+
     return error;
   }
 
+  if (blocking) {
+    os::close(pipes[0]);
+
+    // Run the parent hooks.
+    foreach (const Subprocess::Hook& hook, parent_hooks) {
+      Try<Nothing> callback = hook.parent_callback(pid);
+
+      // If the hook callback fails, we shouldn't proceed with the
+      // execution.
+      if (callback.isError()) {
+        LOG(WARNING)
+          << "Failed to execute Subprocess::Hook in parent for child '"
+          << pid << "': " << callback.error();
+
+        os::close(pipes[1]);
+
+        // Close the child-ends of the file descriptors that are created
+        // by this function.
+        os::close(stdinfds.read);
+        os::close(stdoutfds.write);
+        os::close(stderrfds.write);
+
+        // Ensure the child is killed.
+        ::kill(pid, SIGKILL);
+
+        return Error(
+            "Failed to execute Subprocess::Hook in parent for child '" +
+            stringify(pid) + "': " + callback.error());
+      }
+    }
+
+    // Now that we've executed the parent hooks, we can signal the child to
+    // continue by writing to the pipe.
+    char dummy;
+    ssize_t length;
+    while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+           errno == EINTR);
+
+    os::close(pipes[1]);
+
+    if (length != sizeof(dummy)) {
+      // Ensure the child is killed.
+      ::kill(pid, SIGKILL);
+
+      // Close the child-ends of the file descriptors that are created
+      // by this function.
+      os::close(stdinfds.read);
+      os::close(stdoutfds.write);
+      os::close(stderrfds.write);
+      return Error("Failed to synchronize child process");
+    }
+  }
+
   // Parent.
   Subprocess process;
   process.data->pid = pid;