You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/02/05 19:59:12 UTC

[1/5] git commit: Refactored io::splice for C++03.

Updated Branches:
  refs/heads/master 129ad594b -> cbba5d178


Refactored io::splice for C++03.

Review: https://reviews.apache.org/r/17621


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cbba5d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cbba5d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cbba5d17

Branch: refs/heads/master
Commit: cbba5d178c6d49040a8bb62566f024af2bf0089c
Parents: f42e608
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:58:21 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 57 ++++++++++++++++++++++++++++++--
 1 file changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cbba5d17/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e422384..2e7764a 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3782,7 +3782,7 @@ void _splice(
     int to,
     size_t chunk,
     const boost::shared_array<char>& data,
-    Owned<Promise<Nothing>> promise)
+    memory::shared_ptr<Promise<Nothing>> promise)
 {
   // Note that only one of io::read or io::write is outstanding at any
   // one point in time thus the reuse of 'data' for both operations.
@@ -3803,6 +3803,59 @@ void _splice(
     .onDiscarded([=] () { promise->future().discard(); });
 }
 #else
+// Forward declarations.
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size);
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message);
+
+
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise)
+{
+  io::read(from, data.get(), chunk)
+    .onReady(lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+    .onDiscarded(lambda::bind(&Future<Nothing>::discard, promise->future()));
+}
+
+
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size)
+{
+  if (size == 0) { // EOF.
+    promise->set(Nothing());
+  } else {
+    io::write(to, string(data.get(), size))
+      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+      .onDiscarded(lambda::bind(&Future<Nothing>::discard, promise->future()));
+  }
+}
+
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message)
+{
+  promise->fail(message);
+}
 #endif // __cplusplus >= 201103L
 
 } // namespace internal
@@ -3837,7 +3890,7 @@ Future<Nothing> splice(int from, int to, size_t chunk)
   // implementing internal::_splice as a chain of io::read and
   // io::write calls, we use an explicit promise that we pass around
   // so that we don't increase memory usage the longer that we splice.
-  Owned<Promise<Nothing> > promise(new Promise<Nothing>());
+  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
 
   Future<Nothing> future = promise->future();
 


[2/5] git commit: Refactored io::write for C++03.

Posted by be...@apache.org.
Refactored io::write for C++03.

Review: https://reviews.apache.org/r/17619


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61ec7026
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61ec7026
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61ec7026

Branch: refs/heads/master
Commit: 61ec7026523fde25d9c403f42e89e83dde9d62df
Parents: 659a91f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:14:31 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 42 ++++++++++++++++++++++++++------
 1 file changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/61ec7026/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 27765fa..bc111c6 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3702,8 +3702,7 @@ Future<string> _read(
 
 
 Future<string> __read(
-    const size_t& size,
-    // TODO(benh): Remove 'const &' after fixing libprocess.
+    size_t size,
     int fd,
     const memory::shared_ptr<string>& buffer,
     const boost::shared_array<char>& data,
@@ -3745,6 +3744,35 @@ Future<Nothing> _write(
       return _write(fd, data, index + length);
     });
 }
+#else
+// Forward declaration.
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index);
+
+
+Future<Nothing> __write(
+    int fd,
+    Owned<string> data,
+    size_t index,
+    size_t length)
+{
+  if (index + length == data->size()) {
+    return Nothing();
+  }
+  return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
 #endif // __cplusplus >= 201103L
 
 } // namespace internal
@@ -3875,12 +3903,12 @@ Future<Response> request(
     return Failure("Failed to set nonblock: " + nonblock.error());
   }
 
+  // Need to disambiguate the io::read we want when binding below.
+  Future<string> (*read)(int) = io::read;
+
   return io::write(s, out.str())
-    .then([=] () {
-      // Decode once the async read completes.
-      return io::read(s)
-        .then(lambda::bind(&internal::decode, lambda::_1));
-    })
+    .then(lambda::bind(read, s))
+    .then(lambda::bind(&internal::decode, lambda::_1))
     .onAny(lambda::bind(&os::close, s));
 }
 


[5/5] git commit: Added io::write (in C++11).

Posted by be...@apache.org.
Added io::write (in C++11).

Review: https://reviews.apache.org/r/17618


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/659a91f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/659a91f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/659a91f0

Branch: refs/heads/master
Commit: 659a91f067df70c5009934010d8c5b3d6860aeb6
Parents: e24f541
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 12:56:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp |  35 +++-
 3rdparty/libprocess/src/process.cpp        | 248 +++++++++++++++++++-----
 3rdparty/libprocess/src/tests/io_tests.cpp | 107 +++++++++-
 3 files changed, 328 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index 8cf3244..d16ab01 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -6,6 +6,8 @@
 
 #include <process/future.hpp>
 
+#include <stout/nothing.hpp>
+
 namespace process {
 namespace io {
 
@@ -22,14 +24,16 @@ const size_t BUFFERED_READ_SIZE = 16*4096;
 Future<short> poll(int fd, short events);
 
 
-// Performs a single non-blocking read by polling on the specified file
-// descriptor until any data can be be read. The future will become ready when
-// some data is read (may be less than that specified by size). A future failure
-// will be returned if an error is detected. If end-of-file is reached, value
-// zero will be returned. Note that the return type of this function differs
-// from the standard 'read'. In particular, this function returns the number of
-// bytes read or zero on end-of-file (an error is indicated by failing the
-// future, thus only a 'size_t' is necessary rather than a 'ssize_t').
+// Performs a single non-blocking read by polling on the specified
+// file descriptor until any data can be be read. The future will
+// become ready when some data is read (may be less than that
+// specified by size). A failure will be returned if an error is
+// detected. If end-of-file is reached, value zero will be
+// returned. Note that the return type of this function differs from
+// the standard 'read'. In particular, this function returns the
+// number of bytes read or zero on end-of-file (an error is indicated
+// by failing the future, thus only a 'size_t' is necessary rather
+// than a 'ssize_t').
 Future<size_t> read(int fd, void* data, size_t size);
 
 
@@ -38,6 +42,21 @@ Future<size_t> read(int fd, void* data, size_t size);
 // so that EOF can be reached.
 Future<std::string> read(int fd);
 
+
+// Performs a non-blocking write by polling on the specified file
+// descriptor until data can be be written. The future will become
+// ready when some data is written with the number of bytes that were
+// written. This may be less than the specified size of the data. A
+// failure will be returned if an error is detected. In the special
+// case of writing to a socket or pipe, an error will be returned if
+// the read end of the socket or pipe has been closed.
+Future<size_t> write(int fd, void* data, size_t size);
+
+
+// Performs a series of asynchronous writes until all of data has been
+// written or an error occured in which case a failure is returned.
+Future<Nothing> write(int fd, const std::string& data);
+
 } // namespace io {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 59ebeb3..27765fa 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3443,17 +3443,23 @@ namespace io {
 
 namespace internal {
 
-void read(int fd,
-          void* data,
-          size_t size,
-          const memory::shared_ptr<Promise<size_t> >& promise,
-          const Future<short>& future)
+void read(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
 {
   // Ignore this function if the read operation has been cancelled.
   if (promise->future().isDiscarded()) {
     return;
   }
 
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
   // Since promise->future() will be discarded before future is
   // discarded, we should never see a discarded future here because of
   // the check in the beginning of this function.
@@ -3467,12 +3473,105 @@ void read(int fd,
       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
         // Restart the read operation.
         poll(fd, process::io::READ).onAny(
-            lambda::bind(&internal::read, fd, data, size, promise, lambda::_1));
+            lambda::bind(&internal::read,
+                         fd,
+                         data,
+                         size,
+                         promise,
+                         lambda::_1));
+      } else {
+        // Error occurred.
+        promise->fail(strerror(errno));
+      }
+    } else {
+      promise->set(length);
+    }
+  }
+}
+
+
+void write(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
+{
+  // Ignore this function if the write operation has been cancelled.
+  if (promise->future().isDiscarded()) {
+    return;
+  }
+
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
+  // Since promise->future() will be discarded before future is
+  // discarded, we should never see a discarded future here because of
+  // the check in the beginning of this function.
+  CHECK(!future.isDiscarded());
+
+  if (future.isFailed()) {
+    promise->fail(future.failure());
+  } else {
+    // Do a write but ignore SIGPIPE so we can return an error when
+    // writing to a pipe or socket where the reading end is closed.
+    // TODO(benh): The 'suppress' macro failed to work on OS X as it
+    // appears that signal delivery was happening asynchronously.
+    // That is, the signal would not appear to be pending when the
+    // 'suppress' block was closed thus the destructor for
+    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+    // It also appeared that the signal would be delivered to another
+    // thread even if it remained blocked in this thiread. The
+    // workaround here is to check explicitly for EPIPE and then do
+    // 'sigwait' regardless of what 'os::signals::pending' returns. We
+    // don't have that luxury with 'Suppressor' and arbitrary signals
+    // because we don't always have something like EPIPE to tell us
+    // that a signal is (or will soon be) pending.
+    bool pending = os::signals::pending(SIGPIPE);
+    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+    ssize_t length = ::write(fd, data, size);
+
+    // Save the errno so we can restore it after doing sig* functions
+    // below.
+    int errno_ = errno;
+
+    if (length < 0 && errno == EPIPE && !pending) {
+      sigset_t mask;
+      sigemptyset(&mask);
+      sigaddset(&mask, SIGPIPE);
+
+      int result;
+      do {
+        int ignored;
+        result = sigwait(&mask, &ignored);
+      } while (result == -1 && errno == EINTR);
+    }
+
+    if (unblock) {
+      os::signals::unblock(SIGPIPE);
+    }
+
+    errno = errno_;
+
+    if (length < 0) {
+      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Restart the write operation.
+        poll(fd, process::io::WRITE).onAny(
+            lambda::bind(&internal::write,
+                         fd,
+                         data,
+                         size,
+                         promise,
+                         lambda::_1));
       } else {
         // Error occurred.
         promise->fail(strerror(errno));
       }
     } else {
+      // TODO(benh): Retry if 'length' is 0?
       promise->set(length);
     }
   }
@@ -3518,39 +3617,71 @@ Future<size_t> read(int fd, void* data, size_t size)
   // Check the file descriptor.
   Try<bool> nonblock = os::isNonblock(fd);
   if (nonblock.isError()) {
-    // The file descriptor is not valid (e.g. fd has been closed).
-    promise->fail(string("Failed to check O_NONBLOCK") + strerror(errno));
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        string(strerror(errno)));
     return promise->future();
   } else if (!nonblock.get()) {
-    // The fd is not opened with O_NONBLOCK set.
-    promise->fail("Please use a fd opened with O_NONBLOCK set");
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
     return promise->future();
   }
 
-  if (size == 0) {
-    promise->fail("Try to read nothing");
+  // Because the file descriptor is non-blocking, we call read()
+  // immediately. The read may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::read(fd, data, size, promise, io::READ);
+
+  return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+  process::initialize();
+
+  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+  // Check the file descriptor.
+  Try<bool> nonblock = os::isNonblock(fd);
+  if (nonblock.isError()) {
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        string(strerror(errno)));
+    return promise->future();
+  } else if (!nonblock.get()) {
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
     return promise->future();
   }
 
-  // Because the file descriptor is non-blocking, we call read()
-  // immediately. The read may in turn call poll if needed, avoiding
-  // unnecessary polling. We also observed that for some combination
-  // of libev and Linux kernel versions, the poll would block for
-  // non-deterministically long periods of time. This may be fixed in
-  // a newer version of libev (we use 3.8 at the time of writing this
-  // comment).
-  internal::read(fd, data, size, promise, io::READ);
+  // Because the file descriptor is non-blocking, we call write()
+  // immediately. The write may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::write(fd, data, size, promise, io::WRITE);
 
   return promise->future();
 }
 
+
 namespace internal {
 
 #if __cplusplus >= 201103L
-Future<string> _read(int fd,
-                     const memory::shared_ptr<string>& buffer,
-                     const boost::shared_array<char>& data,
-                     size_t length)
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
 {
   return io::read(fd, data.get(), length)
     .then([=] (size_t size) -> Future<string> {
@@ -3563,10 +3694,11 @@ Future<string> _read(int fd,
 }
 #else
 // Forward declataion.
-Future<string> _read(int fd,
-                     const memory::shared_ptr<string>& buffer,
-                     const boost::shared_array<char>& data,
-                     size_t length);
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length);
 
 
 Future<string> __read(
@@ -3587,16 +3719,34 @@ Future<string> __read(
 }
 
 
-Future<string> _read(int fd,
-                     const memory::shared_ptr<string>& buffer,
-                     const boost::shared_array<char>& data,
-                     size_t length)
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
 {
   return io::read(fd, data.get(), length)
     .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
 }
 #endif // __cplusplus >= 201103L
 
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then([=] (size_t length) -> Future<Nothing> {
+      if (index + length == data->size()) {
+        return Nothing();
+      }
+      return _write(fd, data, index + length);
+    });
+}
+#endif // __cplusplus >= 201103L
+
 } // namespace internal
 
 
@@ -3613,6 +3763,13 @@ Future<string> read(int fd)
 }
 
 
+Future<Nothing> write(int fd, const std::string& data)
+{
+  process::initialize();
+
+  return internal::_write(fd, Owned<string>(new string(data)), 0);
+}
+
 } // namespace io {
 
 
@@ -3712,33 +3869,18 @@ Future<Response> request(
         << body.get();
   }
 
-  // TODO(bmahler): Use benh's async write when it gets committed.
-  const string& data = out.str();
-  int remaining = data.size();
-
-  while (remaining > 0) {
-    int n = write(s, data.data() + (data.size() - remaining), remaining);
-
-    if (n < 0) {
-      if (errno == EINTR) {
-        continue;
-      }
-      os::close(s);
-      return Failure(string("Failed to write: ") + strerror(errno));
-    }
-
-    remaining -= n;
-  }
-
   Try<Nothing> nonblock = os::nonblock(s);
   if (!nonblock.isSome()) {
     os::close(s);
     return Failure("Failed to set nonblock: " + nonblock.error());
   }
 
-  // Decode once the async read completes.
-  return io::read(s)
-    .then(lambda::bind(&internal::decode, lambda::_1))
+  return io::write(s, out.str())
+    .then([=] () {
+      // Decode once the async read completes.
+      return io::read(s)
+        .then(lambda::bind(&internal::decode, lambda::_1));
+    })
     .onAny(lambda::bind(&os::close, s));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/659a91f0/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index ee5b0b4..edf7066 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -61,7 +61,7 @@ TEST(IO, Read)
   ASSERT_SOME(os::nonblock(pipes[1]));
 
   // Test reading nothing.
-  AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 0));
+  AWAIT_EXPECT_EQ(0, io::read(pipes[0], data, 0));
 
   // Test successful read.
   Future<size_t> future = io::read(pipes[0], data, 3);
@@ -162,3 +162,108 @@ TEST(IO, BufferedRead)
 
   ASSERT_SOME(os::rm("file"));
 }
+
+
+TEST(IO, Write)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  int pipes[2];
+
+  // Create a blocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  // Test on a blocking file descriptor.
+  AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+  close(pipes[0]);
+  close(pipes[1]);
+
+  // Test on a closed file descriptor.
+  AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+  // Create a nonblocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Test writing nothing.
+  AWAIT_EXPECT_EQ(0, io::write(pipes[1], (void*) "hi", 0));
+
+  // Test successful write.
+  AWAIT_EXPECT_EQ(2, io::write(pipes[1], (void*) "hi", 2));
+
+  char data[2];
+  AWAIT_EXPECT_EQ(2, io::read(pipes[0], data, 2));
+  EXPECT_EQ("hi", string(data, 2));
+
+  // Test write to broken pipe.
+  close(pipes[0]);
+  AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
+
+  close(pipes[1]);
+}
+
+
+TEST(IO, BlockingWrite)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  int pipes[2];
+
+  // Create a nonblocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Determine the pipe buffer size by writing until we block.
+  size_t size = 0;
+  ssize_t length = 0;
+  while ((length = ::write(pipes[1], "data", 4)) >= 0) {
+    size += length;
+  }
+
+  ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
+
+  close(pipes[0]);
+  close(pipes[1]);
+
+  // Recreate a nonblocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Create 8 pipe buffers worth of data. Try and write all the data
+  // at once. Check that the future is pending after doing the
+  // write. Then read 128 bytes and make sure the write remains
+  // pending.
+
+  string data = "data"; // 4 Bytes.
+  ASSERT_EQ(4u, data.size());
+
+  while (data.size() < (8 * size)) {
+    data.append(data);
+  }
+
+  Future<Nothing> future = io::write(pipes[1], data);
+
+  ASSERT_TRUE(future.isPending());
+
+  // Check after reading some data the write remains pending.
+  char temp[128];
+  AWAIT_EXPECT_EQ(128, io::read(pipes[0], temp, 128));
+
+  ASSERT_TRUE(future.isPending());
+
+  length = 128;
+
+  while (length < data.size()) {
+    AWAIT_EXPECT_EQ(128, io::read(pipes[0], temp, 128));
+    length += 128;
+  }
+
+  AWAIT_EXPECT_READY(future);
+
+  close(pipes[0]);
+  close(pipes[1]);
+}


[4/5] git commit: Added io::splice (C++11).

Posted by be...@apache.org.
Added io::splice (C++11).

Review: https://reviews.apache.org/r/17620


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f42e6080
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f42e6080
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f42e6080

Branch: refs/heads/master
Commit: f42e60804a4e345319cace9db46856f53a40a69c
Parents: 61ec702
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 13:38:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp |  7 +++
 3rdparty/libprocess/src/process.cpp        | 48 ++++++++++++++
 3rdparty/libprocess/src/tests/io_tests.cpp | 83 +++++++++++++++++++++++++
 3 files changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index d16ab01..7f9b242 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -57,6 +57,13 @@ Future<size_t> write(int fd, void* data, size_t size);
 // written or an error occured in which case a failure is returned.
 Future<Nothing> write(int fd, const std::string& data);
 
+
+// Splices data from one file descriptor to another. Returns when
+// end-of-file is reached on the input file descriptor or returns a
+// failure if an error occurred while reading or writing. Note that
+// both the 'from' and 'to' file descriptors must be non-blocking.
+Future<Nothing> splice(int from, int to, size_t chunk = 4096);
+
 } // namespace io {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc111c6..e422384 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3775,6 +3775,36 @@ Future<Nothing> _write(
 }
 #endif // __cplusplus >= 201103L
 
+
+#if __cplusplus >= 201103L
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    const boost::shared_array<char>& data,
+    Owned<Promise<Nothing>> promise)
+{
+  // Note that only one of io::read or io::write is outstanding at any
+  // one point in time thus the reuse of 'data' for both operations.
+  io::read(from, data.get(), chunk)
+    .onReady([=] (size_t size) {
+      if (size == 0) { // EOF.
+        promise->set(Nothing());
+      } else {
+        io::write(to, string(data.get(), size))
+          .onReady([=] () {
+            _splice(from, to, chunk, data, promise);
+          })
+          .onFailed([=] (const string& message) { promise->fail(message); })
+          .onDiscarded([=] () { promise->future().discard(); });
+      }
+    })
+    .onFailed([=] (const string& message) { promise->fail(message); })
+    .onDiscarded([=] () { promise->future().discard(); });
+}
+#else
+#endif // __cplusplus >= 201103L
+
 } // namespace internal
 
 
@@ -3798,6 +3828,24 @@ Future<Nothing> write(int fd, const std::string& data)
   return internal::_write(fd, Owned<string>(new string(data)), 0);
 }
 
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+  boost::shared_array<char> data(new char[chunk]);
+
+  // Rather than having internal::_splice return a future and
+  // implementing internal::_splice as a chain of io::read and
+  // io::write calls, we use an explicit promise that we pass around
+  // so that we don't increase memory usage the longer that we splice.
+  Owned<Promise<Nothing> > promise(new Promise<Nothing>());
+
+  Future<Nothing> future = promise->future();
+
+  internal::_splice(from, to, chunk, data, promise);
+
+  return future;
+}
+
 } // namespace io {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f42e6080/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index edf7066..eab7864 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -267,3 +267,86 @@ TEST(IO, BlockingWrite)
   close(pipes[0]);
   close(pipes[1]);
 }
+
+
+TEST(IO, splice)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // Create a temporary file for splicing into.
+  Try<string> path = os::mktemp();
+  ASSERT_SOME(path);
+
+  Try<int> fd = os::open(
+      path.get(),
+      O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+  ASSERT_SOME(fd);
+
+  ASSERT_SOME(os::nonblock(fd.get()));
+
+  // Use a pipe for doing the splicing.
+  int pipes[2];
+
+  // Start with a blocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  // Test splicing on a blocking file descriptor.
+  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+  close(pipes[0]);
+  close(pipes[1]);
+
+  // Test on a closed file descriptor.
+  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+  // Now create a nonblocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Test write to broken pipe.
+  close(pipes[0]);
+  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
+
+  close(pipes[1]);
+
+  // Recreate a nonblocking pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Now write data to the pipe and splice to the file.
+
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  // Create more data!
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  Future<Nothing> spliced = io::splice(pipes[0], fd.get());
+
+  AWAIT_READY(io::write(pipes[1], data));
+
+  // Closing the write pipe should cause an EOF on the read end, thus
+  // completing 'spliced'.
+  close(pipes[1]);
+
+  AWAIT_READY(spliced);
+
+  close(pipes[0]);
+
+  os::close(fd.get());
+
+  // Now make sure all the data is there!
+  Try<string> read = os::read(path.get());
+  ASSERT_SOME(read);
+  EXPECT_EQ(data, read.get());
+}


[3/5] git commit: Properly initialized signal sets.

Posted by be...@apache.org.
Properly initialized signal sets.

Review: https://reviews.apache.org/r/17617


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e24f5416
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e24f5416
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e24f5416

Branch: refs/heads/master
Commit: e24f54161b4701a8c383943709ae1524c6ca7903
Parents: 129ad59
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 31 12:03:30 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 5 10:58:36 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e24f5416/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
index 3b4e014..f32130a 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
@@ -38,6 +38,7 @@ inline bool pending(int signal)
 inline bool block(int signal)
 {
   sigset_t set;
+  sigemptyset(&set);
   sigaddset(&set, signal);
 
   sigset_t oldset;
@@ -56,6 +57,7 @@ inline bool block(int signal)
 inline bool unblock(int signal)
 {
   sigset_t set;
+  sigemptyset(&set);
   sigaddset(&set, signal);
 
   sigset_t oldset;