You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/02/10 18:12:14 UTC
[3/8] mesos git commit: Libprocess: Added 'parent_hooks' as an
argument to 'subprocess'.
Libprocess: Added 'parent_hooks' as an argument to 'subprocess'.
In the future we will remove the default argument for `parent_hooks`
to force the caller to think about what kind of subprocess they want
to launch.
Review: https://reviews.apache.org/r/43303
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2cf7fc8e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2cf7fc8e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2cf7fc8e
Branch: refs/heads/master
Commit: 2cf7fc8e2b3d64c850a7a1576ad2f59b32e2fefb
Parents: 2ed70dc
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Feb 4 15:01:12 2016 -0500
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Wed Feb 10 18:12:05 2016 +0100
----------------------------------------------------------------------
.../libprocess/include/process/subprocess.hpp | 28 ++++-
3rdparty/libprocess/src/subprocess.cpp | 103 ++++++++++++++++++-
2 files changed, 123 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2cf7fc8e/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index c12d1c5..e0c306a 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -46,6 +46,9 @@ namespace process {
class Subprocess
{
public:
+ // Forward declarations.
+ struct Hook;
+
/**
* Describes how the I/O is redirected for stdin/stdout/stderr.
* One of the following three modes are supported:
@@ -123,7 +126,8 @@ public:
const Option<std::map<std::string, std::string>>& environment,
const Option<lambda::function<int()>>& setup,
const Option<lambda::function<
- pid_t(const lambda::function<int()>&)>>& clone);
+ pid_t(const lambda::function<int()>&)>>& clone,
+ const std::vector<Subprocess::Hook>& parent_hooks);
IO(const lambda::function<Try<InputFileDescriptors>()>& _input,
const lambda::function<Try<OutputFileDescriptors>()>& _output)
@@ -223,7 +227,8 @@ private:
const Option<std::map<std::string, std::string>>& environment,
const Option<lambda::function<int()>>& setup,
const Option<lambda::function<
- pid_t(const lambda::function<int()>&)>>& clone);
+ pid_t(const lambda::function<int()>&)>>& clone,
+ const std::vector<Subprocess::Hook>& parent_hooks);
struct Data
{
@@ -277,8 +282,12 @@ private:
* async unsafe code in the body of this function.
* @param clone Function to be invoked in order to fork/clone the
* subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ * before the child execs.
* @return The subprocess or an error if one occured.
*/
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
Try<Subprocess> subprocess(
const std::string& path,
std::vector<std::string> argv,
@@ -289,7 +298,9 @@ Try<Subprocess> subprocess(
const Option<std::map<std::string, std::string>>& environment = None(),
const Option<lambda::function<int()>>& setup = None(),
const Option<lambda::function<
- pid_t(const lambda::function<int()>&)>>& clone = None());
+ pid_t(const lambda::function<int()>&)>>& clone = None(),
+ const std::vector<Subprocess::Hook>& parent_hooks =
+ Subprocess::Hook::None());
/**
@@ -311,8 +322,12 @@ Try<Subprocess> subprocess(
* async unsafe code in the body of this function.
* @param clone Function to be invoked in order to fork/clone the
* subprocess.
+ * @param parent_hooks Hooks that will be executed in the parent
+ * before the child execs.
* @return The subprocess or an error if one occured.
*/
+// TODO(jmlvanre): Consider removing default argument for
+// `parent_hooks` to force the caller to think about setting them.
inline Try<Subprocess> subprocess(
const std::string& command,
const Subprocess::IO& in = Subprocess::FD(STDIN_FILENO),
@@ -321,7 +336,9 @@ inline Try<Subprocess> subprocess(
const Option<std::map<std::string, std::string>>& environment = None(),
const Option<lambda::function<int()>>& setup = None(),
const Option<lambda::function<
- pid_t(const lambda::function<int()>&)>>& clone = None())
+ pid_t(const lambda::function<int()>&)>>& clone = None(),
+ const std::vector<Subprocess::Hook>& parent_hooks =
+ Subprocess::Hook::None())
{
std::vector<std::string> argv = {"sh", "-c", command};
@@ -334,7 +351,8 @@ inline Try<Subprocess> subprocess(
None(),
environment,
setup,
- clone);
+ clone,
+ parent_hooks);
}
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2cf7fc8e/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 581fba6..44ca6d0 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -247,7 +247,9 @@ static int childMain(
const Option<lambda::function<int()>>& setup,
const InputFileDescriptors& stdinfds,
const OutputFileDescriptors& stdoutfds,
- const OutputFileDescriptors& stderrfds)
+ const OutputFileDescriptors& stderrfds,
+ bool blocking,
+ int pipes[2])
{
// Close parent's end of the pipes.
if (stdinfds.write.isSome()) {
@@ -260,6 +262,12 @@ static int childMain(
::close(stderrfds.read.get());
}
+ // Currently we will block the child's execution of the new process
+ // until all the parent hooks (if any) have executed.
+ if (blocking) {
+ ::close(pipes[1]);
+ }
+
// Redirect I/O for stdin/stdout/stderr.
while (::dup2(stdinfds.read, STDIN_FILENO) == -1 && errno == EINTR);
while (::dup2(stdoutfds.write, STDOUT_FILENO) == -1 && errno == EINTR);
@@ -286,6 +294,22 @@ static int childMain(
::close(stderrfds.write);
}
+ if (blocking) {
+ // Do a blocking read on the pipe until the parent signals us to
+ // continue.
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ ABORT("Failed to synchronize with parent");
+ }
+
+ // Now close the pipe as we don't need it anymore.
+ ::close(pipes[0]);
+ }
+
if (setup.isSome()) {
int status = setup.get()();
if (status != 0) {
@@ -309,7 +333,8 @@ Try<Subprocess> subprocess(
const Option<map<string, string>>& environment,
const Option<lambda::function<int()>>& setup,
const Option<lambda::function<
- pid_t(const lambda::function<int()>&)>>& _clone)
+ pid_t(const lambda::function<int()>&)>>& _clone,
+ const std::vector<Subprocess::Hook>& parent_hooks)
{
// File descriptors for redirecting stdin/stdout/stderr.
// These file descriptors are used for different purposes depending
@@ -397,6 +422,17 @@ Try<Subprocess> subprocess(
lambda::function<pid_t(const lambda::function<int()>&)> clone =
(_clone.isSome() ? _clone.get() : defaultClone);
+ // Currently we will block the child's execution of the new process
+ // until all the `parent_hooks` (if any) have executed.
+ int pipes[2];
+ const bool blocking = !parent_hooks.empty();
+
+ if (blocking) {
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK_EQ(0, ::pipe(pipes));
+ }
+
// Now, clone the child process.
pid_t pid = clone(lambda::bind(
&childMain,
@@ -406,7 +442,9 @@ Try<Subprocess> subprocess(
setup,
stdinfds,
stdoutfds,
- stderrfds));
+ stderrfds,
+ blocking,
+ pipes));
delete[] _argv;
@@ -421,9 +459,68 @@ Try<Subprocess> subprocess(
// Save the errno as 'close' below might overwrite it.
ErrnoError error("Failed to clone");
internal::close(stdinfds, stdoutfds, stderrfds);
+
+ if (blocking) {
+ os::close(pipes[0]);
+ os::close(pipes[1]);
+ }
+
return error;
}
+ if (blocking) {
+ os::close(pipes[0]);
+
+ // Run the parent hooks.
+ foreach (const Subprocess::Hook& hook, parent_hooks) {
+ Try<Nothing> callback = hook.parent_callback(pid);
+
+ // If the hook callback fails, we shouldn't proceed with the
+ // execution.
+ if (callback.isError()) {
+ LOG(WARNING)
+ << "Failed to execute Subprocess::Hook in parent for child '"
+ << pid << "': " << callback.error();
+
+ os::close(pipes[1]);
+
+ // Close the child-ends of the file descriptors that are created
+ // by this function.
+ os::close(stdinfds.read);
+ os::close(stdoutfds.write);
+ os::close(stderrfds.write);
+
+ // Ensure the child is killed.
+ ::kill(pid, SIGKILL);
+
+ return Error(
+ "Failed to execute Subprocess::Hook in parent for child '" +
+ stringify(pid) + "': " + callback.error());
+ }
+ }
+
+ // Now that we've executed the parent hooks, we can signal the child to
+ // continue by writing to the pipe.
+ char dummy;
+ ssize_t length;
+ while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ os::close(pipes[1]);
+
+ if (length != sizeof(dummy)) {
+ // Ensure the child is killed.
+ ::kill(pid, SIGKILL);
+
+ // Close the child-ends of the file descriptors that are created
+ // by this function.
+ os::close(stdinfds.read);
+ os::close(stdoutfds.write);
+ os::close(stderrfds.write);
+ return Error("Failed to synchronize child process");
+ }
+ }
+
// Parent.
Subprocess process;
process.data->pid = pid;