You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:07 UTC
[10/30] mesos git commit: Moved process::io::* into io.cpp.
Moved process::io::* into io.cpp.
Review: https://reviews.apache.org/r/27506
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37bba65e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37bba65e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37bba65e
Branch: refs/heads/master
Commit: 37bba65e897e9e06640b7f126fe4871ab917f1ce
Parents: 413ce94
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:28:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/io.cpp | 647 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 633 ------------------------------
3 files changed, 648 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 41c3bd1..aebd281 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -37,6 +37,7 @@ libprocess_la_SOURCES = \
src/gate.hpp \
src/help.cpp \
src/http.cpp \
+ src/io.cpp \
src/latch.cpp \
src/libev.hpp \
src/libev.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
new file mode 100644
index 0000000..75fe75c
--- /dev/null
+++ b/3rdparty/libprocess/src/io.cpp
@@ -0,0 +1,647 @@
+#include <string>
+
+#include <boost/shared_array.hpp>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+using std::string;
+
+namespace process {
+namespace io {
+namespace internal {
+
+void read(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
+{
+ // Ignore this function if the read operation has been discarded.
+ if (promise->future().hasDiscard()) {
+ CHECK(!future.isPending());
+ promise->discard();
+ return;
+ }
+
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
+ if (future.isDiscarded()) {
+ promise->fail("Failed to poll: discarded future");
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else {
+ ssize_t length = ::read(fd, data, size);
+ if (length < 0) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Restart the read operation.
+ Future<short> future =
+ io::poll(fd, process::io::READ).onAny(
+ lambda::bind(&internal::read,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
+
+ // Stop polling if a discard occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<short>,
+ WeakFuture<short>(future)));
+ } else {
+ // Error occurred.
+ promise->fail(strerror(errno));
+ }
+ } else {
+ promise->set(length);
+ }
+ }
+}
+
+
+void write(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
+{
+ // Ignore this function if the write operation has been discarded.
+ if (promise->future().hasDiscard()) {
+ promise->discard();
+ return;
+ }
+
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
+ if (future.isDiscarded()) {
+ promise->fail("Failed to poll: discarded future");
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else {
+ // Do a write but ignore SIGPIPE so we can return an error when
+ // writing to a pipe or socket where the reading end is closed.
+ // TODO(benh): The 'suppress' macro failed to work on OS X as it
+ // appears that signal delivery was happening asynchronously.
+ // That is, the signal would not appear to be pending when the
+ // 'suppress' block was closed thus the destructor for
+ // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+ // It also appeared that the signal would be delivered to another
+ // thread even if it remained blocked in this thiread. The
+ // workaround here is to check explicitly for EPIPE and then do
+ // 'sigwait' regardless of what 'os::signals::pending' returns. We
+ // don't have that luxury with 'Suppressor' and arbitrary signals
+ // because we don't always have something like EPIPE to tell us
+ // that a signal is (or will soon be) pending.
+ bool pending = os::signals::pending(SIGPIPE);
+ bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+ ssize_t length = ::write(fd, data, size);
+
+ // Save the errno so we can restore it after doing sig* functions
+ // below.
+ int errno_ = errno;
+
+ if (length < 0 && errno == EPIPE && !pending) {
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGPIPE);
+
+ int result;
+ do {
+ int ignored;
+ result = sigwait(&mask, &ignored);
+ } while (result == -1 && errno == EINTR);
+ }
+
+ if (unblock) {
+ os::signals::unblock(SIGPIPE);
+ }
+
+ errno = errno_;
+
+ if (length < 0) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Restart the write operation.
+ Future<short> future =
+ io::poll(fd, process::io::WRITE).onAny(
+ lambda::bind(&internal::write,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
+
+ // Stop polling if a discard occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<short>,
+ WeakFuture<short>(future)));
+ } else {
+ // Error occurred.
+ promise->fail(strerror(errno));
+ }
+ } else {
+ // TODO(benh): Retry if 'length' is 0?
+ promise->set(length);
+ }
+ }
+}
+
+} // namespace internal {
+
+
+Future<size_t> read(int fd, void* data, size_t size)
+{
+ process::initialize();
+
+ memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+ // Check the file descriptor.
+ Try<bool> nonblock = os::isNonblock(fd);
+ if (nonblock.isError()) {
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ nonblock.error());
+ return promise->future();
+ } else if (!nonblock.get()) {
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
+ return promise->future();
+ }
+
+ // Because the file descriptor is non-blocking, we call read()
+ // immediately. The read may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::read(fd, data, size, promise, io::READ);
+
+ return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+ process::initialize();
+
+ memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+ // Check the file descriptor.
+ Try<bool> nonblock = os::isNonblock(fd);
+ if (nonblock.isError()) {
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ nonblock.error());
+ return promise->future();
+ } else if (!nonblock.get()) {
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
+ return promise->future();
+ }
+
+ // Because the file descriptor is non-blocking, we call write()
+ // immediately. The write may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::write(fd, data, size, promise, io::WRITE);
+
+ return promise->future();
+}
+
+
+namespace internal {
+
+#if __cplusplus >= 201103L
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ return io::read(fd, data.get(), length)
+ .then([=] (size_t size) -> Future<string> {
+ if (size == 0) { // EOF.
+ return string(*buffer);
+ }
+ buffer->append(data.get(), size);
+ return _read(fd, buffer, data, length);
+ });
+}
+#else
+// Forward declataion.
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length);
+
+
+Future<string> __read(
+ size_t size,
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ if (size == 0) { // EOF.
+ return string(*buffer);
+ }
+
+ buffer->append(data.get(), size);
+
+ return _read(fd, buffer, data, length);
+}
+
+
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ return io::read(fd, data.get(), length)
+ .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then([=] (size_t length) -> Future<Nothing> {
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+ });
+}
+#else
+// Forward declaration.
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index);
+
+
+Future<Nothing> __write(
+ int fd,
+ Owned<string> data,
+ size_t index,
+ size_t length)
+{
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing>> promise)
+{
+ // Stop splicing if a discard occured on our future.
+ if (promise->future().hasDiscard()) {
+ // TODO(benh): Consider returning the number of bytes already
+ // spliced on discarded, or a failure. Same for the 'onDiscarded'
+ // callbacks below.
+ promise->discard();
+ return;
+ }
+
+ // Note that only one of io::read or io::write is outstanding at any
+ // one point in time thus the reuse of 'data' for both operations.
+
+ Future<size_t> read = io::read(from, data.get(), chunk);
+
+ // Stop reading (or potentially indefinitely polling) if a discard
+ // occcurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<size_t>,
+ WeakFuture<size_t>(read)));
+
+ read
+ .onReady([=] (size_t size) {
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ // Note that we always try and complete the write, even if a
+ // discard has occured on our future, in order to provide
+ // semantics where everything read is written. The promise
+ // will eventually be discarded in the next read.
+ io::write(to, string(data.get(), size))
+ .onReady([=] () { _splice(from, to, chunk, data, promise); })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->discard(); });
+ }
+ })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->discard(); });
+}
+#else
+// Forward declarations.
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size);
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message);
+
+void ____splice(
+ memory::shared_ptr<Promise<Nothing> > promise);
+
+
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise)
+{
+ // Stop splicing if a discard occured on our future.
+ if (promise->future().hasDiscard()) {
+ // TODO(benh): Consider returning the number of bytes already
+ // spliced on discarded, or a failure. Same for the 'onDiscarded'
+ // callbacks below.
+ promise->discard();
+ return;
+ }
+
+ Future<size_t> read = io::read(from, data.get(), chunk);
+
+ // Stop reading (or potentially indefinitely polling) if a discard
+ // occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<size_t>,
+ WeakFuture<size_t>(read)));
+
+ read
+ .onReady(
+ lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&____splice, promise));
+}
+
+
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size)
+{
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ // Note that we always try and complete the write, even if a
+ // discard has occured on our future, in order to provide
+ // semantics where everything read is written. The promise will
+ // eventually be discarded in the next read.
+ io::write(to, string(data.get(), size))
+ .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&____splice, promise));
+ }
+}
+
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message)
+{
+ promise->fail(message);
+}
+
+
+void ____splice(
+ memory::shared_ptr<Promise<Nothing> > promise)
+{
+ promise->discard();
+}
+#endif // __cplusplus >= 201103L
+
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+ boost::shared_array<char> data(new char[chunk]);
+
+ // Rather than having internal::_splice return a future and
+ // implementing internal::_splice as a chain of io::read and
+ // io::write calls, we use an explicit promise that we pass around
+ // so that we don't increase memory usage the longer that we splice.
+ memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+ Future<Nothing> future = promise->future();
+
+ _splice(from, to, chunk, data, promise);
+
+ return future;
+}
+
+} // namespace internal {
+
+
+Future<string> read(int fd)
+{
+ process::initialize();
+
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
+
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
+
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
+
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
+
+ // TODO(benh): Wrap up this data as a struct, use 'Owner'.
+ // TODO(bmahler): For efficiency, use a rope for the buffer.
+ memory::shared_ptr<string> buffer(new string());
+ boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
+
+ return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+ .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> write(int fd, const std::string& data)
+{
+ process::initialize();
+
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
+
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
+
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
+
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
+
+ return internal::_write(fd, Owned<string>(new string(data)), 0)
+ .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+{
+ // Make sure we've got "valid" file descriptors.
+ if (from < 0 || (to.isSome() && to.get() < 0)) {
+ return Failure(strerror(EBADF));
+ }
+
+ if (to.isNone()) {
+ // Open up /dev/null that we can splice into.
+ Try<int> open = os::open("/dev/null", O_WRONLY);
+
+ if (open.isError()) {
+ return Failure("Failed to open /dev/null for writing: " + open.error());
+ }
+
+ to = open.get();
+ } else {
+ // Duplicate 'to' so that we're in control of its lifetime.
+ int fd = dup(to.get());
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
+ }
+
+ to = fd;
+ }
+
+ CHECK_SOME(to);
+
+ // Duplicate 'from' so that we're in control of its lifetime.
+ from = dup(from);
+ if (from == -1) {
+ return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
+ }
+
+ // Set the close-on-exec flag (no-op if already set).
+ Try<Nothing> cloexec = os::cloexec(from);
+ if (cloexec.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
+ }
+
+ cloexec = os::cloexec(to.get());
+ if (cloexec.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
+ }
+
+ // Make the file descriptors non-blocking (no-op if already set).
+ Try<Nothing> nonblock = os::nonblock(from);
+ if (nonblock.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
+ }
+
+ nonblock = os::nonblock(to.get());
+ if (nonblock.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
+ }
+
+ return internal::splice(from, to.get(), chunk)
+ .onAny(lambda::bind(&os::close, from))
+ .onAny(lambda::bind(&os::close, to.get()));
+}
+
+} // namespace io {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 1fc8874..aeaac0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -41,8 +41,6 @@
#include <stdexcept>
#include <vector>
-#include <boost/shared_array.hpp>
-
#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
@@ -3115,637 +3113,6 @@ void post(const UPID& from,
}
-namespace io {
-namespace internal {
-
-void read(
- int fd,
- void* data,
- size_t size,
- const memory::shared_ptr<Promise<size_t> >& promise,
- const Future<short>& future)
-{
- // Ignore this function if the read operation has been discarded.
- if (promise->future().hasDiscard()) {
- CHECK(!future.isPending());
- promise->discard();
- return;
- }
-
- if (size == 0) {
- promise->set(0);
- return;
- }
-
- if (future.isDiscarded()) {
- promise->fail("Failed to poll: discarded future");
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else {
- ssize_t length = ::read(fd, data, size);
- if (length < 0) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- // Restart the read operation.
- Future<short> future =
- io::poll(fd, process::io::READ).onAny(
- lambda::bind(&internal::read,
- fd,
- data,
- size,
- promise,
- lambda::_1));
-
- // Stop polling if a discard occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<short>,
- WeakFuture<short>(future)));
- } else {
- // Error occurred.
- promise->fail(strerror(errno));
- }
- } else {
- promise->set(length);
- }
- }
-}
-
-
-void write(
- int fd,
- void* data,
- size_t size,
- const memory::shared_ptr<Promise<size_t> >& promise,
- const Future<short>& future)
-{
- // Ignore this function if the write operation has been discarded.
- if (promise->future().hasDiscard()) {
- promise->discard();
- return;
- }
-
- if (size == 0) {
- promise->set(0);
- return;
- }
-
- if (future.isDiscarded()) {
- promise->fail("Failed to poll: discarded future");
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else {
- // Do a write but ignore SIGPIPE so we can return an error when
- // writing to a pipe or socket where the reading end is closed.
- // TODO(benh): The 'suppress' macro failed to work on OS X as it
- // appears that signal delivery was happening asynchronously.
- // That is, the signal would not appear to be pending when the
- // 'suppress' block was closed thus the destructor for
- // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
- // It also appeared that the signal would be delivered to another
- // thread even if it remained blocked in this thiread. The
- // workaround here is to check explicitly for EPIPE and then do
- // 'sigwait' regardless of what 'os::signals::pending' returns. We
- // don't have that luxury with 'Suppressor' and arbitrary signals
- // because we don't always have something like EPIPE to tell us
- // that a signal is (or will soon be) pending.
- bool pending = os::signals::pending(SIGPIPE);
- bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
-
- ssize_t length = ::write(fd, data, size);
-
- // Save the errno so we can restore it after doing sig* functions
- // below.
- int errno_ = errno;
-
- if (length < 0 && errno == EPIPE && !pending) {
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, SIGPIPE);
-
- int result;
- do {
- int ignored;
- result = sigwait(&mask, &ignored);
- } while (result == -1 && errno == EINTR);
- }
-
- if (unblock) {
- os::signals::unblock(SIGPIPE);
- }
-
- errno = errno_;
-
- if (length < 0) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- // Restart the write operation.
- Future<short> future =
- io::poll(fd, process::io::WRITE).onAny(
- lambda::bind(&internal::write,
- fd,
- data,
- size,
- promise,
- lambda::_1));
-
- // Stop polling if a discard occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<short>,
- WeakFuture<short>(future)));
- } else {
- // Error occurred.
- promise->fail(strerror(errno));
- }
- } else {
- // TODO(benh): Retry if 'length' is 0?
- promise->set(length);
- }
- }
-}
-
-} // namespace internal {
-
-
-Future<size_t> read(int fd, void* data, size_t size)
-{
- process::initialize();
-
- memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
- // Check the file descriptor.
- Try<bool> nonblock = os::isNonblock(fd);
- if (nonblock.isError()) {
- // The file descriptor is not valid (e.g., has been closed).
- promise->fail(
- "Failed to check if file descriptor was non-blocking: " +
- nonblock.error());
- return promise->future();
- } else if (!nonblock.get()) {
- // The file descriptor is not non-blocking.
- promise->fail("Expected a non-blocking file descriptor");
- return promise->future();
- }
-
- // Because the file descriptor is non-blocking, we call read()
- // immediately. The read may in turn call poll if necessary,
- // avoiding unnecessary polling. We also observed that for some
- // combination of libev and Linux kernel versions, the poll would
- // block for non-deterministically long periods of time. This may be
- // fixed in a newer version of libev (we use 3.8 at the time of
- // writing this comment).
- internal::read(fd, data, size, promise, io::READ);
-
- return promise->future();
-}
-
-
-Future<size_t> write(int fd, void* data, size_t size)
-{
- process::initialize();
-
- memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
- // Check the file descriptor.
- Try<bool> nonblock = os::isNonblock(fd);
- if (nonblock.isError()) {
- // The file descriptor is not valid (e.g., has been closed).
- promise->fail(
- "Failed to check if file descriptor was non-blocking: " +
- nonblock.error());
- return promise->future();
- } else if (!nonblock.get()) {
- // The file descriptor is not non-blocking.
- promise->fail("Expected a non-blocking file descriptor");
- return promise->future();
- }
-
- // Because the file descriptor is non-blocking, we call write()
- // immediately. The write may in turn call poll if necessary,
- // avoiding unnecessary polling. We also observed that for some
- // combination of libev and Linux kernel versions, the poll would
- // block for non-deterministically long periods of time. This may be
- // fixed in a newer version of libev (we use 3.8 at the time of
- // writing this comment).
- internal::write(fd, data, size, promise, io::WRITE);
-
- return promise->future();
-}
-
-
-namespace internal {
-
-#if __cplusplus >= 201103L
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- return io::read(fd, data.get(), length)
- .then([=] (size_t size) -> Future<string> {
- if (size == 0) { // EOF.
- return string(*buffer);
- }
- buffer->append(data.get(), size);
- return _read(fd, buffer, data, length);
- });
-}
-#else
-// Forward declataion.
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length);
-
-
-Future<string> __read(
- size_t size,
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- if (size == 0) { // EOF.
- return string(*buffer);
- }
-
- buffer->append(data.get(), size);
-
- return _read(fd, buffer, data, length);
-}
-
-
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- return io::read(fd, data.get(), length)
- .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index)
-{
- return io::write(fd, (void*) (data->data() + index), data->size() - index)
- .then([=] (size_t length) -> Future<Nothing> {
- if (index + length == data->size()) {
- return Nothing();
- }
- return _write(fd, data, index + length);
- });
-}
-#else
-// Forward declaration.
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index);
-
-
-Future<Nothing> __write(
- int fd,
- Owned<string> data,
- size_t index,
- size_t length)
-{
- if (index + length == data->size()) {
- return Nothing();
- }
- return _write(fd, data, index + length);
-}
-
-
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index)
-{
- return io::write(fd, (void*) (data->data() + index), data->size() - index)
- .then(lambda::bind(&__write, fd, data, index, lambda::_1));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-void _splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing>> promise)
-{
- // Stop splicing if a discard occured on our future.
- if (promise->future().hasDiscard()) {
- // TODO(benh): Consider returning the number of bytes already
- // spliced on discarded, or a failure. Same for the 'onDiscarded'
- // callbacks below.
- promise->discard();
- return;
- }
-
- // Note that only one of io::read or io::write is outstanding at any
- // one point in time thus the reuse of 'data' for both operations.
-
- Future<size_t> read = io::read(from, data.get(), chunk);
-
- // Stop reading (or potentially indefinitely polling) if a discard
- // occcurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<size_t>,
- WeakFuture<size_t>(read)));
-
- read
- .onReady([=] (size_t size) {
- if (size == 0) { // EOF.
- promise->set(Nothing());
- } else {
- // Note that we always try and complete the write, even if a
- // discard has occured on our future, in order to provide
- // semantics where everything read is written. The promise
- // will eventually be discarded in the next read.
- io::write(to, string(data.get(), size))
- .onReady([=] () { _splice(from, to, chunk, data, promise); })
- .onFailed([=] (const string& message) { promise->fail(message); })
- .onDiscarded([=] () { promise->discard(); });
- }
- })
- .onFailed([=] (const string& message) { promise->fail(message); })
- .onDiscarded([=] () { promise->discard(); });
-}
-#else
-// Forward declarations.
-void __splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise,
- size_t size);
-
-void ___splice(
- memory::shared_ptr<Promise<Nothing> > promise,
- const string& message);
-
-void ____splice(
- memory::shared_ptr<Promise<Nothing> > promise);
-
-
-void _splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise)
-{
- // Stop splicing if a discard occured on our future.
- if (promise->future().hasDiscard()) {
- // TODO(benh): Consider returning the number of bytes already
- // spliced on discarded, or a failure. Same for the 'onDiscarded'
- // callbacks below.
- promise->discard();
- return;
- }
-
- Future<size_t> read = io::read(from, data.get(), chunk);
-
- // Stop reading (or potentially indefinitely polling) if a discard
- // occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<size_t>,
- WeakFuture<size_t>(read)));
-
- read
- .onReady(
- lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
- .onFailed(lambda::bind(&___splice, promise, lambda::_1))
- .onDiscarded(lambda::bind(&____splice, promise));
-}
-
-
-void __splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise,
- size_t size)
-{
- if (size == 0) { // EOF.
- promise->set(Nothing());
- } else {
- // Note that we always try and complete the write, even if a
- // discard has occured on our future, in order to provide
- // semantics where everything read is written. The promise will
- // eventually be discarded in the next read.
- io::write(to, string(data.get(), size))
- .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
- .onFailed(lambda::bind(&___splice, promise, lambda::_1))
- .onDiscarded(lambda::bind(&____splice, promise));
- }
-}
-
-
-void ___splice(
- memory::shared_ptr<Promise<Nothing> > promise,
- const string& message)
-{
- promise->fail(message);
-}
-
-
-void ____splice(
- memory::shared_ptr<Promise<Nothing> > promise)
-{
- promise->discard();
-}
-#endif // __cplusplus >= 201103L
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
- boost::shared_array<char> data(new char[chunk]);
-
- // Rather than having internal::_splice return a future and
- // implementing internal::_splice as a chain of io::read and
- // io::write calls, we use an explicit promise that we pass around
- // so that we don't increase memory usage the longer that we splice.
- memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
-
- Future<Nothing> future = promise->future();
-
- _splice(from, to, chunk, data, promise);
-
- return future;
-}
-
-} // namespace internal {
-
-
-Future<string> read(int fd)
-{
- process::initialize();
-
- // Get our own copy of the file descriptor so that we're in control
- // of the lifetime and don't crash if/when someone by accidently
- // closes the file descriptor before discarding this future. We can
- // also make sure it's non-blocking and will close-on-exec. Start by
- // checking we've got a "valid" file descriptor before dup'ing.
- if (fd < 0) {
- return Failure(strerror(EBADF));
- }
-
- fd = dup(fd);
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate file descriptor"));
- }
-
- // Set the close-on-exec flag.
- Try<Nothing> cloexec = os::cloexec(fd);
- if (cloexec.isError()) {
- os::close(fd);
- return Failure(
- "Failed to set close-on-exec on duplicated file descriptor: " +
- cloexec.error());
- }
-
- // Make the file descriptor is non-blocking.
- Try<Nothing> nonblock = os::nonblock(fd);
- if (nonblock.isError()) {
- os::close(fd);
- return Failure(
- "Failed to make duplicated file descriptor non-blocking: " +
- nonblock.error());
- }
-
- // TODO(benh): Wrap up this data as a struct, use 'Owner'.
- // TODO(bmahler): For efficiency, use a rope for the buffer.
- memory::shared_ptr<string> buffer(new string());
- boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
-
- return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
- .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> write(int fd, const std::string& data)
-{
- process::initialize();
-
- // Get our own copy of the file descriptor so that we're in control
- // of the lifetime and don't crash if/when someone by accidently
- // closes the file descriptor before discarding this future. We can
- // also make sure it's non-blocking and will close-on-exec. Start by
- // checking we've got a "valid" file descriptor before dup'ing.
- if (fd < 0) {
- return Failure(strerror(EBADF));
- }
-
- fd = dup(fd);
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate file descriptor"));
- }
-
- // Set the close-on-exec flag.
- Try<Nothing> cloexec = os::cloexec(fd);
- if (cloexec.isError()) {
- os::close(fd);
- return Failure(
- "Failed to set close-on-exec on duplicated file descriptor: " +
- cloexec.error());
- }
-
- // Make the file descriptor is non-blocking.
- Try<Nothing> nonblock = os::nonblock(fd);
- if (nonblock.isError()) {
- os::close(fd);
- return Failure(
- "Failed to make duplicated file descriptor non-blocking: " +
- nonblock.error());
- }
-
- return internal::_write(fd, Owned<string>(new string(data)), 0)
- .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
-{
- // Make sure we've got "valid" file descriptors.
- if (from < 0 || (to.isSome() && to.get() < 0)) {
- return Failure(strerror(EBADF));
- }
-
- if (to.isNone()) {
- // Open up /dev/null that we can splice into.
- Try<int> open = os::open("/dev/null", O_WRONLY);
-
- if (open.isError()) {
- return Failure("Failed to open /dev/null for writing: " + open.error());
- }
-
- to = open.get();
- } else {
- // Duplicate 'to' so that we're in control of its lifetime.
- int fd = dup(to.get());
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
- }
-
- to = fd;
- }
-
- CHECK_SOME(to);
-
- // Duplicate 'from' so that we're in control of its lifetime.
- from = dup(from);
- if (from == -1) {
- return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
- }
-
- // Set the close-on-exec flag (no-op if already set).
- Try<Nothing> cloexec = os::cloexec(from);
- if (cloexec.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
- }
-
- cloexec = os::cloexec(to.get());
- if (cloexec.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
- }
-
- // Make the file descriptors non-blocking (no-op if already set).
- Try<Nothing> nonblock = os::nonblock(from);
- if (nonblock.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
- }
-
- nonblock = os::nonblock(to.get());
- if (nonblock.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
- }
-
- return internal::splice(from, to.get(), chunk)
- .onAny(lambda::bind(&os::close, from))
- .onAny(lambda::bind(&os::close, to.get()));
-}
-
-} // namespace io {
-
-
namespace inject {
bool exited(const UPID& from, const UPID& to)