You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:07 UTC

[10/30] mesos git commit: Moved process::io::* into io.cpp.

Moved process::io::* into io.cpp.

Review: https://reviews.apache.org/r/27506


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37bba65e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37bba65e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37bba65e

Branch: refs/heads/master
Commit: 37bba65e897e9e06640b7f126fe4871ab917f1ce
Parents: 413ce94
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:28:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |   1 +
 3rdparty/libprocess/src/io.cpp      | 647 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 633 ------------------------------
 3 files changed, 648 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 41c3bd1..aebd281 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -37,6 +37,7 @@ libprocess_la_SOURCES =		\
   src/gate.hpp			\
   src/help.cpp			\
   src/http.cpp			\
+  src/io.cpp			\
   src/latch.cpp			\
   src/libev.hpp			\
   src/libev.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
new file mode 100644
index 0000000..75fe75c
--- /dev/null
+++ b/3rdparty/libprocess/src/io.cpp
@@ -0,0 +1,647 @@
+#include <string>
+
+#include <boost/shared_array.hpp>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+using std::string;
+
+namespace process {
+namespace io {
+namespace internal {
+
+void read(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
+{
+  // Ignore this function if the read operation has been discarded.
+  if (promise->future().hasDiscard()) {
+    CHECK(!future.isPending());
+    promise->discard();
+    return;
+  }
+
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
+  if (future.isDiscarded()) {
+    promise->fail("Failed to poll: discarded future");
+  } else if (future.isFailed()) {
+    promise->fail(future.failure());
+  } else {
+    ssize_t length = ::read(fd, data, size);
+    if (length < 0) {
+      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Restart the read operation.
+        Future<short> future =
+          io::poll(fd, process::io::READ).onAny(
+              lambda::bind(&internal::read,
+                           fd,
+                           data,
+                           size,
+                           promise,
+                           lambda::_1));
+
+        // Stop polling if a discard occurs on our future.
+        promise->future().onDiscard(
+            lambda::bind(&process::internal::discard<short>,
+                         WeakFuture<short>(future)));
+      } else {
+        // Error occurred.
+        promise->fail(strerror(errno));
+      }
+    } else {
+      promise->set(length);
+    }
+  }
+}
+
+
+void write(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
+{
+  // Ignore this function if the write operation has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+    return;
+  }
+
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
+  if (future.isDiscarded()) {
+    promise->fail("Failed to poll: discarded future");
+  } else if (future.isFailed()) {
+    promise->fail(future.failure());
+  } else {
+    // Do a write but ignore SIGPIPE so we can return an error when
+    // writing to a pipe or socket where the reading end is closed.
+    // TODO(benh): The 'suppress' macro failed to work on OS X as it
+    // appears that signal delivery was happening asynchronously.
+    // That is, the signal would not appear to be pending when the
+    // 'suppress' block was closed thus the destructor for
+    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+    // It also appeared that the signal would be delivered to another
+    // thread even if it remained blocked in this thiread. The
+    // workaround here is to check explicitly for EPIPE and then do
+    // 'sigwait' regardless of what 'os::signals::pending' returns. We
+    // don't have that luxury with 'Suppressor' and arbitrary signals
+    // because we don't always have something like EPIPE to tell us
+    // that a signal is (or will soon be) pending.
+    bool pending = os::signals::pending(SIGPIPE);
+    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+    ssize_t length = ::write(fd, data, size);
+
+    // Save the errno so we can restore it after doing sig* functions
+    // below.
+    int errno_ = errno;
+
+    if (length < 0 && errno == EPIPE && !pending) {
+      sigset_t mask;
+      sigemptyset(&mask);
+      sigaddset(&mask, SIGPIPE);
+
+      int result;
+      do {
+        int ignored;
+        result = sigwait(&mask, &ignored);
+      } while (result == -1 && errno == EINTR);
+    }
+
+    if (unblock) {
+      os::signals::unblock(SIGPIPE);
+    }
+
+    errno = errno_;
+
+    if (length < 0) {
+      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Restart the write operation.
+        Future<short> future =
+          io::poll(fd, process::io::WRITE).onAny(
+              lambda::bind(&internal::write,
+                           fd,
+                           data,
+                           size,
+                           promise,
+                           lambda::_1));
+
+        // Stop polling if a discard occurs on our future.
+        promise->future().onDiscard(
+            lambda::bind(&process::internal::discard<short>,
+                         WeakFuture<short>(future)));
+      } else {
+        // Error occurred.
+        promise->fail(strerror(errno));
+      }
+    } else {
+      // TODO(benh): Retry if 'length' is 0?
+      promise->set(length);
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> read(int fd, void* data, size_t size)
+{
+  process::initialize();
+
+  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+  // Check the file descriptor.
+  Try<bool> nonblock = os::isNonblock(fd);
+  if (nonblock.isError()) {
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        nonblock.error());
+    return promise->future();
+  } else if (!nonblock.get()) {
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
+    return promise->future();
+  }
+
+  // Because the file descriptor is non-blocking, we call read()
+  // immediately. The read may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::read(fd, data, size, promise, io::READ);
+
+  return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+  process::initialize();
+
+  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+  // Check the file descriptor.
+  Try<bool> nonblock = os::isNonblock(fd);
+  if (nonblock.isError()) {
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        nonblock.error());
+    return promise->future();
+  } else if (!nonblock.get()) {
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
+    return promise->future();
+  }
+
+  // Because the file descriptor is non-blocking, we call write()
+  // immediately. The write may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::write(fd, data, size, promise, io::WRITE);
+
+  return promise->future();
+}
+
+
+namespace internal {
+
+#if __cplusplus >= 201103L
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  return io::read(fd, data.get(), length)
+    .then([=] (size_t size) -> Future<string> {
+      if (size == 0) { // EOF.
+        return string(*buffer);
+      }
+      buffer->append(data.get(), size);
+      return _read(fd, buffer, data, length);
+    });
+}
+#else
+// Forward declataion.
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length);
+
+
+Future<string> __read(
+    size_t size,
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  if (size == 0) { // EOF.
+    return string(*buffer);
+  }
+
+  buffer->append(data.get(), size);
+
+  return _read(fd, buffer, data, length);
+}
+
+
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  return io::read(fd, data.get(), length)
+    .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then([=] (size_t length) -> Future<Nothing> {
+      if (index + length == data->size()) {
+        return Nothing();
+      }
+      return _write(fd, data, index + length);
+    });
+}
+#else
+// Forward declaration.
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index);
+
+
+Future<Nothing> __write(
+    int fd,
+    Owned<string> data,
+    size_t index,
+    size_t length)
+{
+  if (index + length == data->size()) {
+    return Nothing();
+  }
+  return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing>> promise)
+{
+  // Stop splicing if a discard occured on our future.
+  if (promise->future().hasDiscard()) {
+    // TODO(benh): Consider returning the number of bytes already
+    // spliced on discarded, or a failure. Same for the 'onDiscarded'
+    // callbacks below.
+    promise->discard();
+    return;
+  }
+
+  // Note that only one of io::read or io::write is outstanding at any
+  // one point in time thus the reuse of 'data' for both operations.
+
+  Future<size_t> read = io::read(from, data.get(), chunk);
+
+  // Stop reading (or potentially indefinitely polling) if a discard
+  // occcurs on our future.
+  promise->future().onDiscard(
+      lambda::bind(&process::internal::discard<size_t>,
+                   WeakFuture<size_t>(read)));
+
+  read
+    .onReady([=] (size_t size) {
+      if (size == 0) { // EOF.
+        promise->set(Nothing());
+      } else {
+        // Note that we always try and complete the write, even if a
+        // discard has occured on our future, in order to provide
+        // semantics where everything read is written. The promise
+        // will eventually be discarded in the next read.
+        io::write(to, string(data.get(), size))
+          .onReady([=] () { _splice(from, to, chunk, data, promise); })
+          .onFailed([=] (const string& message) { promise->fail(message); })
+          .onDiscarded([=] () { promise->discard(); });
+      }
+    })
+    .onFailed([=] (const string& message) { promise->fail(message); })
+    .onDiscarded([=] () { promise->discard(); });
+}
+#else
+// Forward declarations.
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size);
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message);
+
+void ____splice(
+    memory::shared_ptr<Promise<Nothing> > promise);
+
+
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise)
+{
+  // Stop splicing if a discard occured on our future.
+  if (promise->future().hasDiscard()) {
+    // TODO(benh): Consider returning the number of bytes already
+    // spliced on discarded, or a failure. Same for the 'onDiscarded'
+    // callbacks below.
+    promise->discard();
+    return;
+  }
+
+  Future<size_t> read = io::read(from, data.get(), chunk);
+
+  // Stop reading (or potentially indefinitely polling) if a discard
+  // occurs on our future.
+  promise->future().onDiscard(
+      lambda::bind(&process::internal::discard<size_t>,
+                   WeakFuture<size_t>(read)));
+
+  read
+    .onReady(
+        lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+    .onDiscarded(lambda::bind(&____splice, promise));
+}
+
+
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size)
+{
+  if (size == 0) { // EOF.
+    promise->set(Nothing());
+  } else {
+    // Note that we always try and complete the write, even if a
+    // discard has occured on our future, in order to provide
+    // semantics where everything read is written. The promise will
+    // eventually be discarded in the next read.
+    io::write(to, string(data.get(), size))
+      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+      .onDiscarded(lambda::bind(&____splice, promise));
+  }
+}
+
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message)
+{
+  promise->fail(message);
+}
+
+
+void ____splice(
+    memory::shared_ptr<Promise<Nothing> > promise)
+{
+  promise->discard();
+}
+#endif // __cplusplus >= 201103L
+
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+  boost::shared_array<char> data(new char[chunk]);
+
+  // Rather than having internal::_splice return a future and
+  // implementing internal::_splice as a chain of io::read and
+  // io::write calls, we use an explicit promise that we pass around
+  // so that we don't increase memory usage the longer that we splice.
+  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+  Future<Nothing> future = promise->future();
+
+  _splice(from, to, chunk, data, promise);
+
+  return future;
+}
+
+} // namespace internal {
+
+
+Future<string> read(int fd)
+{
+  process::initialize();
+
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
+  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
+  // TODO(bmahler): For efficiency, use a rope for the buffer.
+  memory::shared_ptr<string> buffer(new string());
+  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
+
+  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+    .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> write(int fd, const std::string& data)
+{
+  process::initialize();
+
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
+  return internal::_write(fd, Owned<string>(new string(data)), 0)
+    .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+{
+  // Make sure we've got "valid" file descriptors.
+  if (from < 0 || (to.isSome() && to.get() < 0)) {
+    return Failure(strerror(EBADF));
+  }
+
+  if (to.isNone()) {
+    // Open up /dev/null that we can splice into.
+    Try<int> open = os::open("/dev/null", O_WRONLY);
+
+    if (open.isError()) {
+      return Failure("Failed to open /dev/null for writing: " + open.error());
+    }
+
+    to = open.get();
+  } else {
+    // Duplicate 'to' so that we're in control of its lifetime.
+    int fd = dup(to.get());
+    if (fd == -1) {
+      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
+    }
+
+    to = fd;
+  }
+
+  CHECK_SOME(to);
+
+  // Duplicate 'from' so that we're in control of its lifetime.
+  from = dup(from);
+  if (from == -1) {
+    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
+  }
+
+  // Set the close-on-exec flag (no-op if already set).
+  Try<Nothing> cloexec = os::cloexec(from);
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
+  }
+
+  cloexec = os::cloexec(to.get());
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
+  }
+
+  // Make the file descriptors non-blocking (no-op if already set).
+  Try<Nothing> nonblock = os::nonblock(from);
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
+  }
+
+  nonblock = os::nonblock(to.get());
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
+  }
+
+  return internal::splice(from, to.get(), chunk)
+    .onAny(lambda::bind(&os::close, from))
+    .onAny(lambda::bind(&os::close, to.get()));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 1fc8874..aeaac0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -41,8 +41,6 @@
 #include <stdexcept>
 #include <vector>
 
-#include <boost/shared_array.hpp>
-
 #include <process/check.hpp>
 #include <process/clock.hpp>
 #include <process/defer.hpp>
@@ -3115,637 +3113,6 @@ void post(const UPID& from,
 }
 
 
-namespace io {
-namespace internal {
-
-void read(
-    int fd,
-    void* data,
-    size_t size,
-    const memory::shared_ptr<Promise<size_t> >& promise,
-    const Future<short>& future)
-{
-  // Ignore this function if the read operation has been discarded.
-  if (promise->future().hasDiscard()) {
-    CHECK(!future.isPending());
-    promise->discard();
-    return;
-  }
-
-  if (size == 0) {
-    promise->set(0);
-    return;
-  }
-
-  if (future.isDiscarded()) {
-    promise->fail("Failed to poll: discarded future");
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else {
-    ssize_t length = ::read(fd, data, size);
-    if (length < 0) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        // Restart the read operation.
-        Future<short> future =
-          io::poll(fd, process::io::READ).onAny(
-              lambda::bind(&internal::read,
-                           fd,
-                           data,
-                           size,
-                           promise,
-                           lambda::_1));
-
-        // Stop polling if a discard occurs on our future.
-        promise->future().onDiscard(
-            lambda::bind(&process::internal::discard<short>,
-                         WeakFuture<short>(future)));
-      } else {
-        // Error occurred.
-        promise->fail(strerror(errno));
-      }
-    } else {
-      promise->set(length);
-    }
-  }
-}
-
-
-void write(
-    int fd,
-    void* data,
-    size_t size,
-    const memory::shared_ptr<Promise<size_t> >& promise,
-    const Future<short>& future)
-{
-  // Ignore this function if the write operation has been discarded.
-  if (promise->future().hasDiscard()) {
-    promise->discard();
-    return;
-  }
-
-  if (size == 0) {
-    promise->set(0);
-    return;
-  }
-
-  if (future.isDiscarded()) {
-    promise->fail("Failed to poll: discarded future");
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else {
-    // Do a write but ignore SIGPIPE so we can return an error when
-    // writing to a pipe or socket where the reading end is closed.
-    // TODO(benh): The 'suppress' macro failed to work on OS X as it
-    // appears that signal delivery was happening asynchronously.
-    // That is, the signal would not appear to be pending when the
-    // 'suppress' block was closed thus the destructor for
-    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
-    // It also appeared that the signal would be delivered to another
-    // thread even if it remained blocked in this thiread. The
-    // workaround here is to check explicitly for EPIPE and then do
-    // 'sigwait' regardless of what 'os::signals::pending' returns. We
-    // don't have that luxury with 'Suppressor' and arbitrary signals
-    // because we don't always have something like EPIPE to tell us
-    // that a signal is (or will soon be) pending.
-    bool pending = os::signals::pending(SIGPIPE);
-    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
-
-    ssize_t length = ::write(fd, data, size);
-
-    // Save the errno so we can restore it after doing sig* functions
-    // below.
-    int errno_ = errno;
-
-    if (length < 0 && errno == EPIPE && !pending) {
-      sigset_t mask;
-      sigemptyset(&mask);
-      sigaddset(&mask, SIGPIPE);
-
-      int result;
-      do {
-        int ignored;
-        result = sigwait(&mask, &ignored);
-      } while (result == -1 && errno == EINTR);
-    }
-
-    if (unblock) {
-      os::signals::unblock(SIGPIPE);
-    }
-
-    errno = errno_;
-
-    if (length < 0) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        // Restart the write operation.
-        Future<short> future =
-          io::poll(fd, process::io::WRITE).onAny(
-              lambda::bind(&internal::write,
-                           fd,
-                           data,
-                           size,
-                           promise,
-                           lambda::_1));
-
-        // Stop polling if a discard occurs on our future.
-        promise->future().onDiscard(
-            lambda::bind(&process::internal::discard<short>,
-                         WeakFuture<short>(future)));
-      } else {
-        // Error occurred.
-        promise->fail(strerror(errno));
-      }
-    } else {
-      // TODO(benh): Retry if 'length' is 0?
-      promise->set(length);
-    }
-  }
-}
-
-} // namespace internal {
-
-
-Future<size_t> read(int fd, void* data, size_t size)
-{
-  process::initialize();
-
-  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
-  // Check the file descriptor.
-  Try<bool> nonblock = os::isNonblock(fd);
-  if (nonblock.isError()) {
-    // The file descriptor is not valid (e.g., has been closed).
-    promise->fail(
-        "Failed to check if file descriptor was non-blocking: " +
-        nonblock.error());
-    return promise->future();
-  } else if (!nonblock.get()) {
-    // The file descriptor is not non-blocking.
-    promise->fail("Expected a non-blocking file descriptor");
-    return promise->future();
-  }
-
-  // Because the file descriptor is non-blocking, we call read()
-  // immediately. The read may in turn call poll if necessary,
-  // avoiding unnecessary polling. We also observed that for some
-  // combination of libev and Linux kernel versions, the poll would
-  // block for non-deterministically long periods of time. This may be
-  // fixed in a newer version of libev (we use 3.8 at the time of
-  // writing this comment).
-  internal::read(fd, data, size, promise, io::READ);
-
-  return promise->future();
-}
-
-
-Future<size_t> write(int fd, void* data, size_t size)
-{
-  process::initialize();
-
-  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
-  // Check the file descriptor.
-  Try<bool> nonblock = os::isNonblock(fd);
-  if (nonblock.isError()) {
-    // The file descriptor is not valid (e.g., has been closed).
-    promise->fail(
-        "Failed to check if file descriptor was non-blocking: " +
-        nonblock.error());
-    return promise->future();
-  } else if (!nonblock.get()) {
-    // The file descriptor is not non-blocking.
-    promise->fail("Expected a non-blocking file descriptor");
-    return promise->future();
-  }
-
-  // Because the file descriptor is non-blocking, we call write()
-  // immediately. The write may in turn call poll if necessary,
-  // avoiding unnecessary polling. We also observed that for some
-  // combination of libev and Linux kernel versions, the poll would
-  // block for non-deterministically long periods of time. This may be
-  // fixed in a newer version of libev (we use 3.8 at the time of
-  // writing this comment).
-  internal::write(fd, data, size, promise, io::WRITE);
-
-  return promise->future();
-}
-
-
-namespace internal {
-
-#if __cplusplus >= 201103L
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  return io::read(fd, data.get(), length)
-    .then([=] (size_t size) -> Future<string> {
-      if (size == 0) { // EOF.
-        return string(*buffer);
-      }
-      buffer->append(data.get(), size);
-      return _read(fd, buffer, data, length);
-    });
-}
-#else
-// Forward declataion.
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length);
-
-
-Future<string> __read(
-    size_t size,
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  if (size == 0) { // EOF.
-    return string(*buffer);
-  }
-
-  buffer->append(data.get(), size);
-
-  return _read(fd, buffer, data, length);
-}
-
-
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  return io::read(fd, data.get(), length)
-    .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index)
-{
-  return io::write(fd, (void*) (data->data() + index), data->size() - index)
-    .then([=] (size_t length) -> Future<Nothing> {
-      if (index + length == data->size()) {
-        return Nothing();
-      }
-      return _write(fd, data, index + length);
-    });
-}
-#else
-// Forward declaration.
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index);
-
-
-Future<Nothing> __write(
-    int fd,
-    Owned<string> data,
-    size_t index,
-    size_t length)
-{
-  if (index + length == data->size()) {
-    return Nothing();
-  }
-  return _write(fd, data, index + length);
-}
-
-
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index)
-{
-  return io::write(fd, (void*) (data->data() + index), data->size() - index)
-    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-void _splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing>> promise)
-{
-  // Stop splicing if a discard occured on our future.
-  if (promise->future().hasDiscard()) {
-    // TODO(benh): Consider returning the number of bytes already
-    // spliced on discarded, or a failure. Same for the 'onDiscarded'
-    // callbacks below.
-    promise->discard();
-    return;
-  }
-
-  // Note that only one of io::read or io::write is outstanding at any
-  // one point in time thus the reuse of 'data' for both operations.
-
-  Future<size_t> read = io::read(from, data.get(), chunk);
-
-  // Stop reading (or potentially indefinitely polling) if a discard
-  // occcurs on our future.
-  promise->future().onDiscard(
-      lambda::bind(&process::internal::discard<size_t>,
-                   WeakFuture<size_t>(read)));
-
-  read
-    .onReady([=] (size_t size) {
-      if (size == 0) { // EOF.
-        promise->set(Nothing());
-      } else {
-        // Note that we always try and complete the write, even if a
-        // discard has occured on our future, in order to provide
-        // semantics where everything read is written. The promise
-        // will eventually be discarded in the next read.
-        io::write(to, string(data.get(), size))
-          .onReady([=] () { _splice(from, to, chunk, data, promise); })
-          .onFailed([=] (const string& message) { promise->fail(message); })
-          .onDiscarded([=] () { promise->discard(); });
-      }
-    })
-    .onFailed([=] (const string& message) { promise->fail(message); })
-    .onDiscarded([=] () { promise->discard(); });
-}
-#else
-// Forward declarations.
-void __splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise,
-    size_t size);
-
-void ___splice(
-    memory::shared_ptr<Promise<Nothing> > promise,
-    const string& message);
-
-void ____splice(
-    memory::shared_ptr<Promise<Nothing> > promise);
-
-
-void _splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise)
-{
-  // Stop splicing if a discard occured on our future.
-  if (promise->future().hasDiscard()) {
-    // TODO(benh): Consider returning the number of bytes already
-    // spliced on discarded, or a failure. Same for the 'onDiscarded'
-    // callbacks below.
-    promise->discard();
-    return;
-  }
-
-  Future<size_t> read = io::read(from, data.get(), chunk);
-
-  // Stop reading (or potentially indefinitely polling) if a discard
-  // occurs on our future.
-  promise->future().onDiscard(
-      lambda::bind(&process::internal::discard<size_t>,
-                   WeakFuture<size_t>(read)));
-
-  read
-    .onReady(
-        lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
-    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
-    .onDiscarded(lambda::bind(&____splice, promise));
-}
-
-
-void __splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise,
-    size_t size)
-{
-  if (size == 0) { // EOF.
-    promise->set(Nothing());
-  } else {
-    // Note that we always try and complete the write, even if a
-    // discard has occured on our future, in order to provide
-    // semantics where everything read is written. The promise will
-    // eventually be discarded in the next read.
-    io::write(to, string(data.get(), size))
-      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
-      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
-      .onDiscarded(lambda::bind(&____splice, promise));
-  }
-}
-
-
-void ___splice(
-    memory::shared_ptr<Promise<Nothing> > promise,
-    const string& message)
-{
-  promise->fail(message);
-}
-
-
-void ____splice(
-    memory::shared_ptr<Promise<Nothing> > promise)
-{
-  promise->discard();
-}
-#endif // __cplusplus >= 201103L
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
-  boost::shared_array<char> data(new char[chunk]);
-
-  // Rather than having internal::_splice return a future and
-  // implementing internal::_splice as a chain of io::read and
-  // io::write calls, we use an explicit promise that we pass around
-  // so that we don't increase memory usage the longer that we splice.
-  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
-
-  Future<Nothing> future = promise->future();
-
-  _splice(from, to, chunk, data, promise);
-
-  return future;
-}
-
-} // namespace internal {
-
-
-Future<string> read(int fd)
-{
-  process::initialize();
-
-  // Get our own copy of the file descriptor so that we're in control
-  // of the lifetime and don't crash if/when someone by accidently
-  // closes the file descriptor before discarding this future. We can
-  // also make sure it's non-blocking and will close-on-exec. Start by
-  // checking we've got a "valid" file descriptor before dup'ing.
-  if (fd < 0) {
-    return Failure(strerror(EBADF));
-  }
-
-  fd = dup(fd);
-  if (fd == -1) {
-    return Failure(ErrnoError("Failed to duplicate file descriptor"));
-  }
-
-  // Set the close-on-exec flag.
-  Try<Nothing> cloexec = os::cloexec(fd);
-  if (cloexec.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to set close-on-exec on duplicated file descriptor: " +
-        cloexec.error());
-  }
-
-  // Make the file descriptor is non-blocking.
-  Try<Nothing> nonblock = os::nonblock(fd);
-  if (nonblock.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to make duplicated file descriptor non-blocking: " +
-        nonblock.error());
-  }
-
-  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
-  // TODO(bmahler): For efficiency, use a rope for the buffer.
-  memory::shared_ptr<string> buffer(new string());
-  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
-
-  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
-    .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> write(int fd, const std::string& data)
-{
-  process::initialize();
-
-  // Get our own copy of the file descriptor so that we're in control
-  // of the lifetime and don't crash if/when someone by accidently
-  // closes the file descriptor before discarding this future. We can
-  // also make sure it's non-blocking and will close-on-exec. Start by
-  // checking we've got a "valid" file descriptor before dup'ing.
-  if (fd < 0) {
-    return Failure(strerror(EBADF));
-  }
-
-  fd = dup(fd);
-  if (fd == -1) {
-    return Failure(ErrnoError("Failed to duplicate file descriptor"));
-  }
-
-  // Set the close-on-exec flag.
-  Try<Nothing> cloexec = os::cloexec(fd);
-  if (cloexec.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to set close-on-exec on duplicated file descriptor: " +
-        cloexec.error());
-  }
-
-  // Make the file descriptor is non-blocking.
-  Try<Nothing> nonblock = os::nonblock(fd);
-  if (nonblock.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to make duplicated file descriptor non-blocking: " +
-        nonblock.error());
-  }
-
-  return internal::_write(fd, Owned<string>(new string(data)), 0)
-    .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
-{
-  // Make sure we've got "valid" file descriptors.
-  if (from < 0 || (to.isSome() && to.get() < 0)) {
-    return Failure(strerror(EBADF));
-  }
-
-  if (to.isNone()) {
-    // Open up /dev/null that we can splice into.
-    Try<int> open = os::open("/dev/null", O_WRONLY);
-
-    if (open.isError()) {
-      return Failure("Failed to open /dev/null for writing: " + open.error());
-    }
-
-    to = open.get();
-  } else {
-    // Duplicate 'to' so that we're in control of its lifetime.
-    int fd = dup(to.get());
-    if (fd == -1) {
-      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
-    }
-
-    to = fd;
-  }
-
-  CHECK_SOME(to);
-
-  // Duplicate 'from' so that we're in control of its lifetime.
-  from = dup(from);
-  if (from == -1) {
-    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
-  }
-
-  // Set the close-on-exec flag (no-op if already set).
-  Try<Nothing> cloexec = os::cloexec(from);
-  if (cloexec.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
-  }
-
-  cloexec = os::cloexec(to.get());
-  if (cloexec.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
-  }
-
-  // Make the file descriptors non-blocking (no-op if already set).
-  Try<Nothing> nonblock = os::nonblock(from);
-  if (nonblock.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
-  }
-
-  nonblock = os::nonblock(to.get());
-  if (nonblock.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
-  }
-
-  return internal::splice(from, to.get(), chunk)
-    .onAny(lambda::bind(&os::close, from))
-    .onAny(lambda::bind(&os::close, to.get()));
-}
-
-} // namespace io {
-
-
 namespace inject {
 
 bool exited(const UPID& from, const UPID& to)