You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/02/05 19:59:12 UTC
[1/5] git commit: Refactored io::splice for C++03.
Updated Branches:
refs/heads/master 129ad594b -> cbba5d178
Refactored io::splice for C++03.
Review: https://reviews.apache.org/r/17621
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cbba5d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cbba5d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cbba5d17
Branch: refs/heads/master
Commit: cbba5d178c6d49040a8bb62566f024af2bf0089c
Parents: f42e608
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:58:21 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 57 ++++++++++++++++++++++++++++++--
1 file changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cbba5d17/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e422384..2e7764a 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3782,7 +3782,7 @@ void _splice(
int to,
size_t chunk,
const boost::shared_array<char>& data,
- Owned<Promise<Nothing>> promise)
+ memory::shared_ptr<Promise<Nothing>> promise)
{
// Note that only one of io::read or io::write is outstanding at any
// one point in time thus the reuse of 'data' for both operations.
@@ -3803,6 +3803,59 @@ void _splice(
.onDiscarded([=] () { promise->future().discard(); });
}
#else
+// Forward declarations.
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size);
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message);
+
+
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise)
+{
+ io::read(from, data.get(), chunk)
+ .onReady(lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&Future<Nothing>::discard, promise->future()));
+}
+
+
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size)
+{
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ io::write(to, string(data.get(), size))
+ .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&Future<Nothing>::discard, promise->future()));
+ }
+}
+
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message)
+{
+ promise->fail(message);
+}
#endif // __cplusplus >= 201103L
} // namespace internal
@@ -3837,7 +3890,7 @@ Future<Nothing> splice(int from, int to, size_t chunk)
// implementing internal::_splice as a chain of io::read and
// io::write calls, we use an explicit promise that we pass around
// so that we don't increase memory usage the longer that we splice.
- Owned<Promise<Nothing> > promise(new Promise<Nothing>());
+ memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
Future<Nothing> future = promise->future();
[2/5] git commit: Refactored io::write for C++03.
Posted by be...@apache.org.
Refactored io::write for C++03.
Review: https://reviews.apache.org/r/17619
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61ec7026
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61ec7026
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61ec7026
Branch: refs/heads/master
Commit: 61ec7026523fde25d9c403f42e89e83dde9d62df
Parents: 659a91f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:14:31 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 42 ++++++++++++++++++++++++++------
1 file changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/61ec7026/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 27765fa..bc111c6 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3702,8 +3702,7 @@ Future<string> _read(
Future<string> __read(
- const size_t& size,
- // TODO(benh): Remove 'const &' after fixing libprocess.
+ size_t size,
int fd,
const memory::shared_ptr<string>& buffer,
const boost::shared_array<char>& data,
@@ -3745,6 +3744,35 @@ Future<Nothing> _write(
return _write(fd, data, index + length);
});
}
+#else
+// Forward declaration.
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index);
+
+
+Future<Nothing> __write(
+ int fd,
+ Owned<string> data,
+ size_t index,
+ size_t length)
+{
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
#endif // __cplusplus >= 201103L
} // namespace internal
@@ -3875,12 +3903,12 @@ Future<Response> request(
return Failure("Failed to set nonblock: " + nonblock.error());
}
+ // Need to disambiguate the io::read we want when binding below.
+ Future<string> (*read)(int) = io::read;
+
return io::write(s, out.str())
- .then([=] () {
- // Decode once the async read completes.
- return io::read(s)
- .then(lambda::bind(&internal::decode, lambda::_1));
- })
+ .then(lambda::bind(read, s))
+ .then(lambda::bind(&internal::decode, lambda::_1))
.onAny(lambda::bind(&os::close, s));
}
[5/5] git commit: Added io::write (in C++11).
Posted by be...@apache.org.
Added io::write (in C++11).
Review: https://reviews.apache.org/r/17618
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/659a91f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/659a91f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/659a91f0
Branch: refs/heads/master
Commit: 659a91f067df70c5009934010d8c5b3d6860aeb6
Parents: e24f541
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 12:56:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/io.hpp | 35 +++-
3rdparty/libprocess/src/process.cpp | 248 +++++++++++++++++++-----
3rdparty/libprocess/src/tests/io_tests.cpp | 107 +++++++++-
3 files changed, 328 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index 8cf3244..d16ab01 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -6,6 +6,8 @@
#include <process/future.hpp>
+#include <stout/nothing.hpp>
+
namespace process {
namespace io {
@@ -22,14 +24,16 @@ const size_t BUFFERED_READ_SIZE = 16*4096;
Future<short> poll(int fd, short events);
-// Performs a single non-blocking read by polling on the specified file
-// descriptor until any data can be be read. The future will become ready when
-// some data is read (may be less than that specified by size). A future failure
-// will be returned if an error is detected. If end-of-file is reached, value
-// zero will be returned. Note that the return type of this function differs
-// from the standard 'read'. In particular, this function returns the number of
-// bytes read or zero on end-of-file (an error is indicated by failing the
-// future, thus only a 'size_t' is necessary rather than a 'ssize_t').
+// Performs a single non-blocking read by polling on the specified
+// file descriptor until any data can be be read. The future will
+// become ready when some data is read (may be less than that
+// specified by size). A failure will be returned if an error is
+// detected. If end-of-file is reached, value zero will be
+// returned. Note that the return type of this function differs from
+// the standard 'read'. In particular, this function returns the
+// number of bytes read or zero on end-of-file (an error is indicated
+// by failing the future, thus only a 'size_t' is necessary rather
+// than a 'ssize_t').
Future<size_t> read(int fd, void* data, size_t size);
@@ -38,6 +42,21 @@ Future<size_t> read(int fd, void* data, size_t size);
// so that EOF can be reached.
Future<std::string> read(int fd);
+
+// Performs a non-blocking write by polling on the specified file
+// descriptor until data can be be written. The future will become
+// ready when some data is written with the number of bytes that were
+// written. This may be less than the specified size of the data. A
+// failure will be returned if an error is detected. In the special
+// case of writing to a socket or pipe, an error will be returned if
+// the read end of the socket or pipe has been closed.
+Future<size_t> write(int fd, void* data, size_t size);
+
+
+// Performs a series of asynchronous writes until all of data has been
+// written or an error occured in which case a failure is returned.
+Future<Nothing> write(int fd, const std::string& data);
+
} // namespace io {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 59ebeb3..27765fa 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3443,17 +3443,23 @@ namespace io {
namespace internal {
-void read(int fd,
- void* data,
- size_t size,
- const memory::shared_ptr<Promise<size_t> >& promise,
- const Future<short>& future)
+void read(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
{
// Ignore this function if the read operation has been cancelled.
if (promise->future().isDiscarded()) {
return;
}
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
// Since promise->future() will be discarded before future is
// discarded, we should never see a discarded future here because of
// the check in the beginning of this function.
@@ -3467,12 +3473,105 @@ void read(int fd,
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// Restart the read operation.
poll(fd, process::io::READ).onAny(
- lambda::bind(&internal::read, fd, data, size, promise, lambda::_1));
+ lambda::bind(&internal::read,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
+ } else {
+ // Error occurred.
+ promise->fail(strerror(errno));
+ }
+ } else {
+ promise->set(length);
+ }
+ }
+}
+
+
+void write(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
+{
+ // Ignore this function if the write operation has been cancelled.
+ if (promise->future().isDiscarded()) {
+ return;
+ }
+
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
+ // Since promise->future() will be discarded before future is
+ // discarded, we should never see a discarded future here because of
+ // the check in the beginning of this function.
+ CHECK(!future.isDiscarded());
+
+ if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else {
+ // Do a write but ignore SIGPIPE so we can return an error when
+ // writing to a pipe or socket where the reading end is closed.
+ // TODO(benh): The 'suppress' macro failed to work on OS X as it
+ // appears that signal delivery was happening asynchronously.
+ // That is, the signal would not appear to be pending when the
+ // 'suppress' block was closed thus the destructor for
+ // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+ // It also appeared that the signal would be delivered to another
+ // thread even if it remained blocked in this thiread. The
+ // workaround here is to check explicitly for EPIPE and then do
+ // 'sigwait' regardless of what 'os::signals::pending' returns. We
+ // don't have that luxury with 'Suppressor' and arbitrary signals
+ // because we don't always have something like EPIPE to tell us
+ // that a signal is (or will soon be) pending.
+ bool pending = os::signals::pending(SIGPIPE);
+ bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+ ssize_t length = ::write(fd, data, size);
+
+ // Save the errno so we can restore it after doing sig* functions
+ // below.
+ int errno_ = errno;
+
+ if (length < 0 && errno == EPIPE && !pending) {
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGPIPE);
+
+ int result;
+ do {
+ int ignored;
+ result = sigwait(&mask, &ignored);
+ } while (result == -1 && errno == EINTR);
+ }
+
+ if (unblock) {
+ os::signals::unblock(SIGPIPE);
+ }
+
+ errno = errno_;
+
+ if (length < 0) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Restart the write operation.
+ poll(fd, process::io::WRITE).onAny(
+ lambda::bind(&internal::write,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
} else {
// Error occurred.
promise->fail(strerror(errno));
}
} else {
+ // TODO(benh): Retry if 'length' is 0?
promise->set(length);
}
}
@@ -3518,39 +3617,71 @@ Future<size_t> read(int fd, void* data, size_t size)
// Check the file descriptor.
Try<bool> nonblock = os::isNonblock(fd);
if (nonblock.isError()) {
- // The file descriptor is not valid (e.g. fd has been closed).
- promise->fail(string("Failed to check O_NONBLOCK") + strerror(errno));
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ string(strerror(errno)));
return promise->future();
} else if (!nonblock.get()) {
- // The fd is not opened with O_NONBLOCK set.
- promise->fail("Please use a fd opened with O_NONBLOCK set");
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
return promise->future();
}
- if (size == 0) {
- promise->fail("Try to read nothing");
+ // Because the file descriptor is non-blocking, we call read()
+ // immediately. The read may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::read(fd, data, size, promise, io::READ);
+
+ return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+ process::initialize();
+
+ memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+ // Check the file descriptor.
+ Try<bool> nonblock = os::isNonblock(fd);
+ if (nonblock.isError()) {
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ string(strerror(errno)));
+ return promise->future();
+ } else if (!nonblock.get()) {
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
return promise->future();
}
- // Because the file descriptor is non-blocking, we call read()
- // immediately. The read may in turn call poll if needed, avoiding
- // unnecessary polling. We also observed that for some combination
- // of libev and Linux kernel versions, the poll would block for
- // non-deterministically long periods of time. This may be fixed in
- // a newer version of libev (we use 3.8 at the time of writing this
- // comment).
- internal::read(fd, data, size, promise, io::READ);
+ // Because the file descriptor is non-blocking, we call write()
+ // immediately. The write may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::write(fd, data, size, promise, io::WRITE);
return promise->future();
}
+
namespace internal {
#if __cplusplus >= 201103L
-Future<string> _read(int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
{
return io::read(fd, data.get(), length)
.then([=] (size_t size) -> Future<string> {
@@ -3563,10 +3694,11 @@ Future<string> _read(int fd,
}
#else
// Forward declataion.
-Future<string> _read(int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length);
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length);
Future<string> __read(
@@ -3587,16 +3719,34 @@ Future<string> __read(
}
-Future<string> _read(int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
{
return io::read(fd, data.get(), length)
.then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
}
#endif // __cplusplus >= 201103L
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then([=] (size_t length) -> Future<Nothing> {
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+ });
+}
+#endif // __cplusplus >= 201103L
+
} // namespace internal
@@ -3613,6 +3763,13 @@ Future<string> read(int fd)
}
+Future<Nothing> write(int fd, const std::string& data)
+{
+ process::initialize();
+
+ return internal::_write(fd, Owned<string>(new string(data)), 0);
+}
+
} // namespace io {
@@ -3712,33 +3869,18 @@ Future<Response> request(
<< body.get();
}
- // TODO(bmahler): Use benh's async write when it gets committed.
- const string& data = out.str();
- int remaining = data.size();
-
- while (remaining > 0) {
- int n = write(s, data.data() + (data.size() - remaining), remaining);
-
- if (n < 0) {
- if (errno == EINTR) {
- continue;
- }
- os::close(s);
- return Failure(string("Failed to write: ") + strerror(errno));
- }
-
- remaining -= n;
- }
-
Try<Nothing> nonblock = os::nonblock(s);
if (!nonblock.isSome()) {
os::close(s);
return Failure("Failed to set nonblock: " + nonblock.error());
}
- // Decode once the async read completes.
- return io::read(s)
- .then(lambda::bind(&internal::decode, lambda::_1))
+ return io::write(s, out.str())
+ .then([=] () {
+ // Decode once the async read completes.
+ return io::read(s)
+ .then(lambda::bind(&internal::decode, lambda::_1));
+ })
.onAny(lambda::bind(&os::close, s));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index ee5b0b4..edf7066 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -61,7 +61,7 @@ TEST(IO, Read)
ASSERT_SOME(os::nonblock(pipes[1]));
// Test reading nothing.
- AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 0));
+ AWAIT_EXPECT_EQ(0, io::read(pipes[0], data, 0));
// Test successful read.
Future<size_t> future = io::read(pipes[0], data, 3);
@@ -162,3 +162,108 @@ TEST(IO, BufferedRead)
ASSERT_SOME(os::rm("file"));
}
+
+
+TEST(IO, Write)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ int pipes[2];
+
+ // Create a blocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ // Test on a blocking file descriptor.
+ AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+ close(pipes[0]);
+ close(pipes[1]);
+
+ // Test on a closed file descriptor.
+ AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+ // Create a nonblocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+ ASSERT_SOME(os::nonblock(pipes[0]));
+ ASSERT_SOME(os::nonblock(pipes[1]));
+
+ // Test writing nothing.
+ AWAIT_EXPECT_EQ(0, io::write(pipes[1], (void*) "hi", 0));
+
+ // Test successful write.
+ AWAIT_EXPECT_EQ(2, io::write(pipes[1], (void*) "hi", 2));
+
+ char data[2];
+ AWAIT_EXPECT_EQ(2, io::read(pipes[0], data, 2));
+ EXPECT_EQ("hi", string(data, 2));
+
+ // Test write to broken pipe.
+ close(pipes[0]);
+ AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+ close(pipes[1]);
+}
+
+
+TEST(IO, BlockingWrite)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ int pipes[2];
+
+ // Create a nonblocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+ ASSERT_SOME(os::nonblock(pipes[0]));
+ ASSERT_SOME(os::nonblock(pipes[1]));
+
+ // Determine the pipe buffer size by writing until we block.
+ size_t size = 0;
+ ssize_t length = 0;
+ while ((length = ::write(pipes[1], "data", 4)) >= 0) {
+ size += length;
+ }
+
+ ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
+
+ close(pipes[0]);
+ close(pipes[1]);
+
+ // Recreate a nonblocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+ ASSERT_SOME(os::nonblock(pipes[0]));
+ ASSERT_SOME(os::nonblock(pipes[1]));
+
+ // Create 8 pipe buffers worth of data. Try and write all the data
+ // at once. Check that the future is pending after doing the
+ // write. Then read 128 bytes and make sure the write remains
+ // pending.
+
+ string data = "data"; // 4 Bytes.
+ ASSERT_EQ(4u, data.size());
+
+ while (data.size() < (8 * size)) {
+ data.append(data);
+ }
+
+ Future<Nothing> future = io::write(pipes[1], data);
+
+ ASSERT_TRUE(future.isPending());
+
+ // Check after reading some data the write remains pending.
+ char temp[128];
+ AWAIT_EXPECT_EQ(128, io::read(pipes[0], temp, 128));
+
+ ASSERT_TRUE(future.isPending());
+
+ length = 128;
+
+ while (length < data.size()) {
+ AWAIT_EXPECT_EQ(128, io::read(pipes[0], temp, 128));
+ length += 128;
+ }
+
+ AWAIT_EXPECT_READY(future);
+
+ close(pipes[0]);
+ close(pipes[1]);
+}
[4/5] git commit: Added io::splice (C++11).
Posted by be...@apache.org.
Added io::splice (C++11).
Review: https://reviews.apache.org/r/17620
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f42e6080
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f42e6080
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f42e6080
Branch: refs/heads/master
Commit: f42e60804a4e345319cace9db46856f53a40a69c
Parents: 61ec702
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:38:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/io.hpp | 7 +++
3rdparty/libprocess/src/process.cpp | 48 ++++++++++++++
3rdparty/libprocess/src/tests/io_tests.cpp | 83 +++++++++++++++++++++++++
3 files changed, 138 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index d16ab01..7f9b242 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -57,6 +57,13 @@ Future<size_t> write(int fd, void* data, size_t size);
// written or an error occured in which case a failure is returned.
Future<Nothing> write(int fd, const std::string& data);
+
+// Splices data from one file descriptor to another. Returns when
+// end-of-file is reached on the input file descriptor or returns a
+// failure if an error occurred while reading or writing. Note that
+// both the 'from' and 'to' file descriptors must be non-blocking.
+Future<Nothing> splice(int from, int to, size_t chunk = 4096);
+
} // namespace io {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc111c6..e422384 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3775,6 +3775,36 @@ Future<Nothing> _write(
}
#endif // __cplusplus >= 201103L
+
+#if __cplusplus >= 201103L
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ const boost::shared_array<char>& data,
+ Owned<Promise<Nothing>> promise)
+{
+ // Note that only one of io::read or io::write is outstanding at any
+ // one point in time thus the reuse of 'data' for both operations.
+ io::read(from, data.get(), chunk)
+ .onReady([=] (size_t size) {
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ io::write(to, string(data.get(), size))
+ .onReady([=] () {
+ _splice(from, to, chunk, data, promise);
+ })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->future().discard(); });
+ }
+ })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->future().discard(); });
+}
+#else
+#endif // __cplusplus >= 201103L
+
} // namespace internal
@@ -3798,6 +3828,24 @@ Future<Nothing> write(int fd, const std::string& data)
return internal::_write(fd, Owned<string>(new string(data)), 0);
}
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+ boost::shared_array<char> data(new char[chunk]);
+
+ // Rather than having internal::_splice return a future and
+ // implementing internal::_splice as a chain of io::read and
+ // io::write calls, we use an explicit promise that we pass around
+ // so that we don't increase memory usage the longer that we splice.
+ Owned<Promise<Nothing> > promise(new Promise<Nothing>());
+
+ Future<Nothing> future = promise->future();
+
+ internal::_splice(from, to, chunk, data, promise);
+
+ return future;
+}
+
} // namespace io {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index edf7066..eab7864 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -267,3 +267,86 @@ TEST(IO, BlockingWrite)
close(pipes[0]);
close(pipes[1]);
}
+
+
+TEST(IO, splice)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // Create a temporary file for splicing into.
+ Try<string> path = os::mktemp();
+ ASSERT_SOME(path);
+
+ Try<int> fd = os::open(
+ path.get(),
+ O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+ ASSERT_SOME(fd);
+
+ ASSERT_SOME(os::nonblock(fd.get()));
+
+ // Use a pipe for doing the splicing.
+ int pipes[2];
+
+ // Start with a blocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ // Test splicing on a blocking file descriptor.
+ AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+ close(pipes[0]);
+ close(pipes[1]);
+
+ // Test on a closed file descriptor.
+ AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+ // Now create a nonblocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+ ASSERT_SOME(os::nonblock(pipes[0]));
+ ASSERT_SOME(os::nonblock(pipes[1]));
+
+ // Test write to broken pipe.
+ close(pipes[0]);
+ AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+ close(pipes[1]);
+
+ // Recreate a nonblocking pipe.
+ ASSERT_NE(-1, ::pipe(pipes));
+ ASSERT_SOME(os::nonblock(pipes[0]));
+ ASSERT_SOME(os::nonblock(pipes[1]));
+
+ // Now write data to the pipe and splice to the file.
+
+ string data =
+ "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+ "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+ "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+ "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+ "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+ "culpa qui officia deserunt mollit anim id est laborum.";
+
+ // Create more data!
+ while (Bytes(data.size()) < Megabytes(1)) {
+ data.append(data);
+ }
+
+ Future<Nothing> spliced = io::splice(pipes[0], fd.get());
+
+ AWAIT_READY(io::write(pipes[1], data));
+
+ // Closing the write pipe should cause an EOF on the read end, thus
+ // completing 'spliced'.
+ close(pipes[1]);
+
+ AWAIT_READY(spliced);
+
+ close(pipes[0]);
+
+ os::close(fd.get());
+
+ // Now make sure all the data is there!
+ Try<string> read = os::read(path.get());
+ ASSERT_SOME(read);
+ EXPECT_EQ(data, read.get());
+}
[3/5] git commit: Properly initialized signal sets.
Posted by be...@apache.org.
Properly initialized signal sets.
Review: https://reviews.apache.org/r/17617
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e24f5416
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e24f5416
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e24f5416
Branch: refs/heads/master
Commit: e24f54161b4701a8c383943709ae1524c6ca7903
Parents: 129ad59
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 12:03:30 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e24f5416/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
index 3b4e014..f32130a 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
@@ -38,6 +38,7 @@ inline bool pending(int signal)
inline bool block(int signal)
{
sigset_t set;
+ sigemptyset(&set);
sigaddset(&set, signal);
sigset_t oldset;
@@ -56,6 +57,7 @@ inline bool block(int signal)
inline bool unblock(int signal)
{
sigset_t set;
+ sigemptyset(&set);
sigaddset(&set, signal);
sigset_t oldset;