You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/25 23:32:27 UTC

[34/50] mesos git commit: Libprocess: Added 'parent_hooks' as an argument to 'subprocess'.

Libprocess: Added 'parent_hooks' as an argument to 'subprocess'.

In the future we will remove the default argument for `parent_hooks`
to force the caller to think about what kind of subprocess they want
to launch.

Review: https://reviews.apache.org/r/43303


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1c6957f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1c6957f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1c6957f6

Branch: refs/heads/0.26.x
Commit: 1c6957f63f9311c5122fcb6cfddde2b144f9a6e1
Parents: a450c47
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Feb 4 15:01:12 2016 -0500
Committer: Michael Park <mp...@apache.org>
Committed: Fri Feb 26 20:59:06 2016 -0800

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   |  28 ++++-
 3rdparty/libprocess/src/subprocess.cpp          | 104 ++++++++++++++++++-
 2 files changed, 124 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1c6957f6/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index 0932def..14024eb 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -47,6 +47,9 @@ namespace process {
 class Subprocess
 {
 public:
+  // Forward declarations.
+  struct Hook;
+
   /**
    * Describes how the I/O is redirected for stdin/stdout/stderr.
    * One of the following three modes are supported:
@@ -77,7 +80,8 @@ public:
         const Option<std::map<std::string, std::string>>& environment,
         const Option<lambda::function<int()>>& setup,
         const Option<lambda::function<
-            pid_t(const lambda::function<int()>&)>>& clone);
+            pid_t(const lambda::function<int()>&)>>& clone,
+        const std::vector<Subprocess::Hook>& parent_hooks);
 
     enum Mode
     {
@@ -201,7 +205,8 @@ private:
       const Option<std::map<std::string, std::string>>& environment,
       const Option<lambda::function<int()>>& setup,
       const Option<lambda::function<
-          pid_t(const lambda::function<int()>&)>>& clone);
+          pid_t(const lambda::function<int()>&)>>& clone,
+      const std::vector<Subprocess::Hook>& parent_hooks);
 
   struct Data
   {
@@ -255,8 +260,12 @@ private:
  *     async unsafe code in the body of this function.
  * @param clone Function to be invoked in order to fork/clone the
  *     subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ *     before the child execs.
  * @return The subprocess or an error if one occured.
  */
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
 Try<Subprocess> subprocess(
     const std::string& path,
     std::vector<std::string> argv,
@@ -267,7 +276,9 @@ Try<Subprocess> subprocess(
     const Option<std::map<std::string, std::string>>& environment = None(),
     const Option<lambda::function<int()>>& setup = None(),
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& clone = None());
+        pid_t(const lambda::function<int()>&)>>& clone = None(),
+    const std::vector<Subprocess::Hook>& parent_hooks =
+      Subprocess::Hook::None());
 
 
 /**
@@ -289,8 +300,12 @@ Try<Subprocess> subprocess(
  *     async unsafe code in the body of this function.
  * @param clone Function to be invoked in order to fork/clone the
  *     subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ *     before the child execs.
  * @return The subprocess or an error if one occured.
  */
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
 inline Try<Subprocess> subprocess(
     const std::string& command,
     const Subprocess::IO& in = Subprocess::FD(STDIN_FILENO),
@@ -299,7 +314,9 @@ inline Try<Subprocess> subprocess(
     const Option<std::map<std::string, std::string>>& environment = None(),
     const Option<lambda::function<int()>>& setup = None(),
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& clone = None())
+        pid_t(const lambda::function<int()>&)>>& clone = None(),
+    const std::vector<Subprocess::Hook>& parent_hooks =
+      Subprocess::Hook::None())
 {
   std::vector<std::string> argv = {"sh", "-c", command};
 
@@ -312,7 +329,8 @@ inline Try<Subprocess> subprocess(
       None(),
       environment,
       setup,
-      clone);
+      clone,
+      parent_hooks);
 }
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1c6957f6/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 3bf3a0b..7caffe2 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -133,7 +133,9 @@ static int childMain(
     const Option<lambda::function<int()>>& setup,
     int stdinFd[2],
     int stdoutFd[2],
-    int stderrFd[2])
+    int stderrFd[2],
+    bool blocking,
+    int pipes[2])
 {
   // Close parent's end of the pipes.
   if (in.isPipe()) {
@@ -146,6 +148,12 @@ static int childMain(
     ::close(stderrFd[0]);
   }
 
+  // Currently we will block the child's execution of the new process
+  // until all the parent hooks (if any) have executed.
+  if (blocking) {
+    ::close(pipes[1]);
+  }
+
   // Redirect I/O for stdin/stdout/stderr.
   while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
   while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
@@ -172,6 +180,22 @@ static int childMain(
     ::close(stderrFd[1]);
   }
 
+  if (blocking) {
+    // Do a blocking read on the pipe until the parent signals us to
+    // continue.
+    char dummy;
+    ssize_t length;
+    while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+          errno == EINTR);
+
+    if (length != sizeof(dummy)) {
+      ABORT("Failed to synchronize with parent");
+    }
+
+    // Now close the pipe as we don't need it anymore.
+    ::close(pipes[0]);
+  }
+
   if (setup.isSome()) {
     int status = setup.get()();
     if (status != 0) {
@@ -195,7 +219,8 @@ Try<Subprocess> subprocess(
     const Option<map<string, string>>& environment,
     const Option<lambda::function<int()>>& setup,
     const Option<lambda::function<
-        pid_t(const lambda::function<int()>&)>>& _clone)
+        pid_t(const lambda::function<int()>&)>>& _clone,
+    const std::vector<Subprocess::Hook>& parent_hooks)
 {
   // File descriptors for redirecting stdin/stdout/stderr. These file
   // descriptors are used for different purposes depending on the
@@ -366,6 +391,17 @@ Try<Subprocess> subprocess(
   lambda::function<pid_t(const lambda::function<int()>&)> clone =
     (_clone.isSome() ? _clone.get() : defaultClone);
 
+  // Currently we will block the child's execution of the new process
+  // until all the `parent_hooks` (if any) have executed.
+  int pipes[2];
+  const bool blocking = !parent_hooks.empty();
+
+  if (blocking) {
+    // We assume this should not fail under reasonable conditions so we
+    // use CHECK.
+    CHECK_EQ(0, ::pipe(pipes));
+  }
+
   // Now, clone the child process.
   pid_t pid = clone(lambda::bind(
       &childMain,
@@ -378,7 +414,9 @@ Try<Subprocess> subprocess(
       setup,
       stdinFd,
       stdoutFd,
-      stderrFd));
+      stderrFd,
+      blocking,
+      pipes));
 
   delete[] _argv;
 
@@ -392,10 +430,70 @@ Try<Subprocess> subprocess(
   if (pid == -1) {
     // Save the errno as 'close' below might overwrite it.
     ErrnoError error("Failed to clone");
+
     internal::close(stdinFd, stdoutFd, stderrFd);
+
+    if (blocking) {
+      os::close(pipes[0]);
+      os::close(pipes[1]);
+    }
+
     return error;
   }
 
+  if (blocking) {
+    os::close(pipes[0]);
+
+    // Run the parent hooks.
+    foreach (const Subprocess::Hook& hook, parent_hooks) {
+      Try<Nothing> callback = hook.parent_callback(pid);
+
+      // If the hook callback fails, we shouldn't proceed with the
+      // execution.
+      if (callback.isError()) {
+        LOG(WARNING)
+          << "Failed to execute Subprocess::Hook in parent for child '"
+          << pid << "': " << callback.error();
+
+        os::close(pipes[1]);
+
+        // Close the child-ends of the file descriptors that are created
+        // by this function.
+        os::close(stdinFd[0]);
+        os::close(stdoutFd[1]);
+        os::close(stderrFd[1]);
+
+        // Ensure the child is killed.
+        ::kill(pid, SIGKILL);
+
+        return Error(
+            "Failed to execute Subprocess::Hook in parent for child '" +
+            stringify(pid) + "': " + callback.error());
+      }
+    }
+
+    // Now that we've executed the parent hooks, we can signal the child to
+    // continue by writing to the pipe.
+    char dummy;
+    ssize_t length;
+    while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+           errno == EINTR);
+
+    os::close(pipes[1]);
+
+    if (length != sizeof(dummy)) {
+      // Ensure the child is killed.
+      ::kill(pid, SIGKILL);
+
+      // Close the child-ends of the file descriptors that are created
+      // by this function.
+      os::close(stdinFd[0]);
+      os::close(stdoutFd[1]);
+      os::close(stderrFd[1]);
+      return Error("Failed to synchronize child process");
+    }
+  }
+
   // Parent.
   Subprocess process;
   process.data->pid = pid;