You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/03/28 18:19:09 UTC

[3/7] mesos git commit: Subprocess: [3/7] Introduced watchdog option.

Subprocess: [3/7] Introduced watchdog option.

Some newly created processes such as perf should be killed in case the
parent dies. Currently this is achieved by forking a new process from
the child process which serves as a 'watchdog' and kill the child if the
parent dies. This review introduces this as a general behavior into
subprocess (and hence removes the need for the custom setup function).

Review: https://reviews.apache.org/r/45232/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3d1bd49
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3d1bd49
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3d1bd49

Branch: refs/heads/master
Commit: a3d1bd49a3456c4be19292293f5bd2c76ac46632
Parents: 5043f0d
Author: Joerg Schad <jo...@mesosphere.io>
Authored: Mon Mar 28 17:02:06 2016 +0200
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Mar 28 18:16:49 2016 +0200

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   |  27 ++++-
 3rdparty/libprocess/src/subprocess.cpp          | 113 ++++++++++++++++++-
 .../libprocess/src/tests/subprocess_tests.cpp   | 107 ++----------------
 3 files changed, 137 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a3d1bd49/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index bf1a504..da80664 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -42,6 +42,14 @@ enum Setsid
   NO_SETSID,
 };
 
+// Flag describing whether a new process should be monitored by a seperate
+// watch process and be killed in case the parent process dies.
+enum WATCHDOG
+{
+  MONITOR,
+  NO_MONITOR,
+};
+
 /**
  * Represents a fork() exec()ed subprocess. Access is provided to the
  * input / output of the process, as well as the exit status. The
@@ -135,7 +143,8 @@ public:
         const Option<lambda::function<
             pid_t(const lambda::function<int()>&)>>& clone,
         const std::vector<Subprocess::Hook>& parent_hooks,
-        const Option<std::string>& working_directory);
+        const Option<std::string>& working_directory,
+        const WATCHDOG watchdog);
 
     IO(const lambda::function<Try<InputFileDescriptors>()>& _input,
        const lambda::function<Try<OutputFileDescriptors>()>& _output)
@@ -237,7 +246,8 @@ private:
       const Option<lambda::function<
           pid_t(const lambda::function<int()>&)>>& clone,
       const std::vector<Subprocess::Hook>& parent_hooks,
-      const Option<std::string>& working_directory);
+      const Option<std::string>& working_directory,
+      const WATCHDOG watchdog);
 
   struct Data
   {
@@ -289,6 +299,8 @@ private:
  *     before the child execs.
  * @param working_directory Directory in which the process should
  *     chdir before exec after the 'parent_hooks' have been executed.
+ * @param watchdog Indicator whether to new process should be monitored
+ *     and killed if the parent process terminates.
  * @return The subprocess or an error if one occured.
  */
 // TODO(jmlvanre): Consider removing default argument for
@@ -306,7 +318,8 @@ Try<Subprocess> subprocess(
         pid_t(const lambda::function<int()>&)>>& clone = None(),
     const std::vector<Subprocess::Hook>& parent_hooks =
       Subprocess::Hook::None(),
-    const Option<std::string>& working_directory = None());
+    const Option<std::string>& working_directory = None(),
+    const WATCHDOG watchdog = NO_MONITOR);
 
 
 /**
@@ -331,6 +344,8 @@ Try<Subprocess> subprocess(
  *     before the child execs.
  * @param working_directory Directory in which the process should
  *     chdir before exec after the 'parent_hooks' have been executed.
+ * @param watchdog Indicator whether to new process should be monitored
+ *     and killed if the parent process terminates.
  * @return The subprocess or an error if one occured.
  */
 // TODO(jmlvanre): Consider removing default argument for
@@ -346,7 +361,8 @@ inline Try<Subprocess> subprocess(
         pid_t(const lambda::function<int()>&)>>& clone = None(),
     const std::vector<Subprocess::Hook>& parent_hooks =
       Subprocess::Hook::None(),
-    const Option<std::string>& working_directory = None())
+    const Option<std::string>& working_directory = None(),
+    const WATCHDOG watchdog = NO_MONITOR)
 {
   std::vector<std::string> argv = {"sh", "-c", command};
 
@@ -361,7 +377,8 @@ inline Try<Subprocess> subprocess(
       environment,
       clone,
       parent_hooks,
-      working_directory);
+      working_directory,
+      watchdog);
 }
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a3d1bd49/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index f0b6306..be32962 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -10,11 +10,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License
 
+#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <sys/types.h>
 #include <unistd.h>
 
+#ifdef __linux__
+#include <sys/prctl.h>
+#endif // __linux__
+#include <sys/types.h>
+
 #include <string>
 
 #include <glog/logging.h>
@@ -238,8 +243,91 @@ static pid_t defaultClone(const lambda::function<int()>& func)
 }
 
 
-// The main entry of the child process. Note that this function has to
-// be async signal safe.
+static void signalHandler(int signal)
+{
+  // Send SIGKILL to every process in the process group of the
+  // calling process.
+  kill(0, SIGKILL);
+  abort();
+}
+
+
+// Creates a seperate watchdog process to monitor the child process and
+// kill it in case the parent process dies.
+//
+// NOTE: This function needs to be async signal safe. In fact,
+// all the library functions we used in this function are async
+// signal safe.
+static int watchdogProcess()
+{
+#ifdef __linux__
+  // Send SIGTERM to the current process if the parent (i.e., the
+  // slave) exits.
+  // NOTE:: This function should always succeed because we are passing
+  // in a valid signal.
+  prctl(PR_SET_PDEATHSIG, SIGTERM);
+
+  // Put the current process into a separate process group so that
+  // we can kill it and all its children easily.
+  if (setpgid(0, 0) != 0) {
+    abort();
+  }
+
+  // Install a SIGTERM handler which will kill the current process
+  // group. Since we already setup the death signal above, the
+  // signal handler will be triggered when the parent (e.g., the
+  // slave) exits.
+  if (os::signals::install(SIGTERM, &signalHandler) != 0) {
+    abort();
+  }
+
+  pid_t pid = fork();
+  if (pid == -1) {
+    abort();
+  } else if (pid == 0) {
+    // Child. This is the process that is going to exec the
+    // process if zero is returned.
+
+    // We setup death signal for the process as well in case
+    // someone, though unlikely, accidentally kill the parent of
+    // this process (the bookkeeping process).
+    prctl(PR_SET_PDEATHSIG, SIGKILL);
+
+    // NOTE: We don't need to clear the signal handler explicitly
+    // because the subsequent 'exec' will clear them.
+    return 0;
+  } else {
+    // Parent. This is the bookkeeping process which will wait for
+    // the child process to finish.
+
+    // Close the files to prevent interference on the communication
+    // between the slave and the child process.
+    close(STDIN_FILENO);
+    close(STDOUT_FILENO);
+    close(STDERR_FILENO);
+
+    // Block until the child process finishes.
+    int status = 0;
+    if (waitpid(pid, &status, 0) == -1) {
+      abort();
+    }
+
+    // Forward the exit status if the child process exits normally.
+    if (WIFEXITED(status)) {
+      _exit(WEXITSTATUS(status));
+    }
+
+    abort();
+    UNREACHABLE();
+  }
+#endif
+  return 0;
+}
+
+
+// The main entry of the child process.
+//
+// NOTE: This function has to be async signal safe.
 static int childMain(
     const string& path,
     char** argv,
@@ -250,7 +338,8 @@ static int childMain(
     const OutputFileDescriptors& stderrfds,
     bool blocking,
     int pipes[2],
-    const Option<string>& working_directory)
+    const Option<string>& working_directory,
+    const WATCHDOG watchdog)
 {
   // Close parent's end of the pipes.
   if (stdinfds.write.isSome()) {
@@ -329,6 +418,16 @@ static int childMain(
     }
   }
 
+  // If the child process should die together with its parent we spawn a
+  // separate watchdog process which kills the child when the parent dies.
+  //
+  // NOTE: The watchdog process sets the process group id in order for it and
+  // its child processes to be killed together. We should not (re)set the sid
+  // after this.
+  if (watchdog == MONITOR) {
+    watchdogProcess();
+  }
+
   os::execvpe(path.c_str(), argv, envp);
 
   ABORT("Failed to os::execvpe on path '" + path + "': " + os::strerror(errno));
@@ -347,7 +446,8 @@ Try<Subprocess> subprocess(
     const Option<lambda::function<
         pid_t(const lambda::function<int()>&)>>& _clone,
     const vector<Subprocess::Hook>& parent_hooks,
-    const Option<string>& working_directory)
+    const Option<string>& working_directory,
+    const WATCHDOG watchdog)
 {
   // File descriptors for redirecting stdin/stdout/stderr.
   // These file descriptors are used for different purposes depending
@@ -458,7 +558,8 @@ Try<Subprocess> subprocess(
       stderrfds,
       blocking,
       pipes,
-      working_directory));
+      working_directory,
+      watchdog));
 
   delete[] _argv;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a3d1bd49/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 09e46eb..727e940 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -573,6 +573,7 @@ TEST_F(SubprocessTest, Flags)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PATH(out),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       flags);
 
   ASSERT_SOME(s);
@@ -637,6 +638,7 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PIPE(),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       environment);
 
   ASSERT_SOME(s);
@@ -668,6 +670,7 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PIPE(),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       environment);
 
   ASSERT_SOME(s);
@@ -702,6 +705,7 @@ TEST_F(SubprocessTest, EnvironmentWithSpaces)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PIPE(),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       environment);
 
   ASSERT_SOME(s);
@@ -736,6 +740,7 @@ TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PIPE(),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       environment);
 
   ASSERT_SOME(s);
@@ -773,6 +778,7 @@ TEST_F(SubprocessTest, EnvironmentOverride)
       Subprocess::FD(STDIN_FILENO),
       Subprocess::PIPE(),
       Subprocess::FD(STDERR_FILENO),
+      process::NO_SETSID,
       environment);
 
   ASSERT_SOME(s);
@@ -807,102 +813,5 @@ static int setupChdir(const string& directory)
 }
 
 
-TEST_F(SubprocessTest, Setup)
-{
-  Try<string> directory = os::mkdtemp();
-  ASSERT_SOME(directory);
-
-  // chdir().
-  Try<Subprocess> s = subprocess(
-      "echo hello world > file",
-      Subprocess::FD(STDIN_FILENO),
-      Subprocess::FD(STDOUT_FILENO),
-      Subprocess::FD(STDERR_FILENO),
-      None(),
-      lambda::bind(&setupChdir, directory.get()));
-
-  ASSERT_SOME(s);
-
-  // Advance time until the internal reaper reaps the subprocess.
-  Clock::pause();
-  while (s.get().status().isPending()) {
-    Clock::advance(MAX_REAP_INTERVAL());
-    Clock::settle();
-  }
-  Clock::resume();
-
-  AWAIT_ASSERT_READY(s.get().status());
-  ASSERT_SOME(s.get().status().get());
-
-  // Make sure 'file' is there and contains 'hello world'.
-  const string path = path::join(directory.get(), "file");
-  EXPECT_TRUE(os::exists(path));
-  EXPECT_SOME_EQ("hello world\n", os::read(path));
-
-  os::rmdir(directory.get());
-}
-
-
-static int setupStatus(int ret)
-{
-  return ret;
-}
-
-
-TEST_F(SubprocessTest, SetupStatus)
-{
-  // Exit 0 && setup 1.
-  Try<Subprocess> s = subprocess(
-      "exit 0",
-      Subprocess::FD(STDIN_FILENO),
-      Subprocess::FD(STDOUT_FILENO),
-      Subprocess::FD(STDERR_FILENO),
-      None(),
-      lambda::bind(&setupStatus, 1));
-
-  ASSERT_SOME(s);
-
-  // Advance time until the internal reaper reaps the subprocess.
-  Clock::pause();
-  while (s.get().status().isPending()) {
-    Clock::advance(MAX_REAP_INTERVAL());
-    Clock::settle();
-  }
-  Clock::resume();
-
-  AWAIT_ASSERT_READY(s.get().status());
-  ASSERT_SOME(s.get().status().get());
-
-  int status = s.get().status().get().get();
-
-  // Verify we received the setup returned value instead of the
-  // command status.
-  ASSERT_EQ(1, WEXITSTATUS(status));
-
-  // Exit 1 && setup 0.
-  s = subprocess(
-      "exit 1",
-      Subprocess::FD(STDIN_FILENO),
-      Subprocess::FD(STDOUT_FILENO),
-      Subprocess::FD(STDERR_FILENO),
-      None(),
-      lambda::bind(&setupStatus, 0));
-
-  ASSERT_SOME(s);
-
-  // Advance time until the internal reaper reaps the subprocess.
-  Clock::pause();
-  while (s.get().status().isPending()) {
-    Clock::advance(MAX_REAP_INTERVAL());
-    Clock::settle();
-  }
-  Clock::resume();
-
-  AWAIT_ASSERT_READY(s.get().status());
-  ASSERT_SOME(s.get().status().get());
-
-  status = s.get().status().get().get();
-
-  // Verify we received the command status.
-  ASSERT_EQ(1, WEXITSTATUS(status));
-}
+// TODO(joerg84): Consider adding tests for setsid, working_directory,
+// and watchdog options.