You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/16 03:31:36 UTC

[4/7] git commit: Improved safety of "higher-level" io::read/write.

Improved safety of "higher-level" io::read/write.

These functions tend to read and write "everything" and are often used
with things like Subprocess. A common pattern is that someone will
read from Subprocess::err/out but not hold on to the Subprocess
instance which will cause the underlying file descriptors in
Subprocess to get closed before the io::read/write has completed. To
make this safer, we do what we did with io::redirect and automatically
duplicate the file descriptor.

This was also a good opportunity to remove io::splice from the public
interface since it's not being used and is inherently dangerous.

Review: https://reviews.apache.org/r/24755


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a8c37d46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a8c37d46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a8c37d46

Branch: refs/heads/master
Commit: a8c37d4661c820353b5d622e2d5f9beb9cd1b090
Parents: 02a35ab
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 15:07:10 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 15 18:22:01 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp      |  16 +--
 3rdparty/libprocess/src/process.cpp             | 100 +++++++++++++++----
 3rdparty/libprocess/src/tests/io_tests.cpp      |  89 -----------------
 .../libprocess/src/tests/subprocess_tests.cpp   |   4 +-
 4 files changed, 93 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index c775290..6388770 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -40,6 +40,10 @@ Future<size_t> read(int fd, void* data, size_t size);
 // Performs a series of asynchronous reads, until EOF is reached.
 // NOTE: When using this, ensure the sender will close the connection
 // so that EOF can be reached.
+//
+// NOTE: the specified file descriptor is duplicated and set to
+// close-on-exec and made non-blocking (which will return a failure if
+// these operations can not be performed).
 Future<std::string> read(int fd);
 
 
@@ -55,22 +59,18 @@ Future<size_t> write(int fd, void* data, size_t size);
 
 // Performs a series of asynchronous writes until all of data has been
 // written or an error occured in which case a failure is returned.
+//
+// NOTE: the specified file descriptor is duplicated and set to
+// close-on-exec and made non-blocking (which will return a failure if
+// these operations can not be performed).
 Future<Nothing> write(int fd, const std::string& data);
 
 
-// Splices data from one file descriptor to another. Returns when
-// end-of-file is reached on the input file descriptor or returns a
-// failure if an error occurred while reading or writing. Note that
-// both the 'from' and 'to' file descriptors must be non-blocking.
-Future<Nothing> splice(int from, int to, size_t chunk = 4096);
-
-
 // Redirect output from 'from' file descriptor to 'to' file descriptor
 // or /dev/null if 'to' is None. Note that depending on how we
 // redirect output we duplicate the 'from' and 'to' file descriptors
 // so we can control their lifetimes. Returns after EOF has been hit
 // on 'from' or some form of failure has occured.
-// TODO(benh): Consider subsuming lower-level 'splice'.
 Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
 
 } // namespace io {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d403423..ddcedb7 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -4183,6 +4183,24 @@ void ____splice(
 }
 #endif // __cplusplus >= 201103L
 
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+  boost::shared_array<char> data(new char[chunk]);
+
+  // Rather than having internal::_splice return a future and
+  // implementing internal::_splice as a chain of io::read and
+  // io::write calls, we use an explicit promise that we pass around
+  // so that we don't increase memory usage the longer that we splice.
+  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+  Future<Nothing> future = promise->future();
+
+  _splice(from, to, chunk, data, promise);
+
+  return future;
+}
+
 } // namespace internal {
 
 
@@ -4190,12 +4208,45 @@ Future<string> read(int fd)
 {
   process::initialize();
 
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
   // TODO(benh): Wrap up this data as a struct, use 'Owner'.
   // TODO(bmahler): For efficiency, use a rope for the buffer.
   memory::shared_ptr<string> buffer(new string());
   boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
 
-  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE);
+  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+    .onAny(lambda::bind(&os::close, fd));
 }
 
 
@@ -4203,25 +4254,40 @@ Future<Nothing> write(int fd, const std::string& data)
 {
   process::initialize();
 
-  return internal::_write(fd, Owned<string>(new string(data)), 0);
-}
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
-  boost::shared_array<char> data(new char[chunk]);
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
 
-  // Rather than having internal::_splice return a future and
-  // implementing internal::_splice as a chain of io::read and
-  // io::write calls, we use an explicit promise that we pass around
-  // so that we don't increase memory usage the longer that we splice.
-  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
 
-  Future<Nothing> future = promise->future();
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
 
-  internal::_splice(from, to, chunk, data, promise);
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
 
-  return future;
+  return internal::_write(fd, Owned<string>(new string(data)), 0)
+    .onAny(lambda::bind(&os::close, fd));
 }
 
 
@@ -4289,7 +4355,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
     return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
   }
 
-  return splice(from, to.get(), chunk)
+  return internal::splice(from, to.get(), chunk)
     .onAny(lambda::bind(&os::close, from))
     .onAny(lambda::bind(&os::close, to.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index 05ea7bb..1102b1a 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -290,95 +290,6 @@ TEST(IO, DISABLED_BlockingWrite)
 }
 
 
-TEST(IO, splice)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
-  // Create a temporary file for splicing into.
-  Try<string> path = os::mktemp();
-  ASSERT_SOME(path);
-
-  Try<int> fd = os::open(
-      path.get(),
-      O_WRONLY | O_CREAT | O_TRUNC,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  ASSERT_SOME(fd);
-
-  ASSERT_SOME(os::nonblock(fd.get()));
-
-  // Use a pipe for doing the splicing.
-  int pipes[2];
-
-  // Start with a blocking pipe.
-  ASSERT_NE(-1, ::pipe(pipes));
-
-  // Test splicing on a blocking file descriptor.
-  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
-  ASSERT_SOME(os::close(pipes[0]));
-  ASSERT_SOME(os::close(pipes[1]));
-
-  // Test on a closed file descriptor.
-  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
-  // Now create a nonblocking pipe.
-  ASSERT_NE(-1, ::pipe(pipes));
-  ASSERT_SOME(os::nonblock(pipes[0]));
-  ASSERT_SOME(os::nonblock(pipes[1]));
-
-  // Test write to broken pipe.
-  ASSERT_SOME(os::close(pipes[0]));
-  AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
-  close(pipes[1]);
-
-  // Recreate a nonblocking pipe.
-  ASSERT_NE(-1, ::pipe(pipes));
-  ASSERT_SOME(os::nonblock(pipes[0]));
-  ASSERT_SOME(os::nonblock(pipes[1]));
-
-  // Test discard.
-  Future<Nothing> splice = io::splice(pipes[0], fd.get());
-  EXPECT_TRUE(splice.isPending());
-  splice.discard();
-  AWAIT_DISCARDED(splice);
-
-  // Now write data to the pipe and splice to the file.
-  string data =
-    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
-    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
-    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
-    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
-    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
-    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
-    "culpa qui officia deserunt mollit anim id est laborum.";
-
-  // Create more data!
-  while (Bytes(data.size()) < Megabytes(1)) {
-    data.append(data);
-  }
-
-  splice = io::splice(pipes[0], fd.get());
-
-  AWAIT_READY(io::write(pipes[1], data));
-
-  // Closing the write pipe should cause an EOF on the read end, thus
-  // completing 'splice'.
-  ASSERT_SOME(os::close(pipes[1]));
-
-  AWAIT_READY(splice);
-
-  ASSERT_SOME(os::close(pipes[0]));
-  ASSERT_SOME(os::close(fd.get()));
-
-  // Now make sure all the data is there!
-  Try<string> read = os::read(path.get());
-  ASSERT_SOME(read);
-  EXPECT_EQ(data, read.get());
-}
-
-
 TEST(IO, redirect)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 98a4e44..5fec289 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -209,7 +209,7 @@ TEST_F(SubprocessTest, PipeInput)
 }
 
 
-TEST_F(SubprocessTest, PipeSplice)
+TEST_F(SubprocessTest, PipeRedirect)
 {
   Clock::pause();
 
@@ -234,7 +234,7 @@ TEST_F(SubprocessTest, PipeSplice)
 
   ASSERT_SOME(s.get().out());
   ASSERT_SOME(os::nonblock(s.get().out().get()));
-  AWAIT_READY(io::splice(s.get().out().get(), fd.get()));
+  AWAIT_READY(io::redirect(s.get().out().get(), fd.get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {