You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/16 03:31:36 UTC
[4/7] git commit: Improved safety of "higher-level" io::read/write.
Improved safety of "higher-level" io::read/write.
These functions tend to read and write "everything" and are often used
with things like Subprocess. A common pattern is that someone will
read from Subprocess::err/out but not hold on to the Subprocess
instance which will cause the underlying file descriptors in
Subprocess to get closed before the io::read/write has completed. To
make this safer, we do what we did with io::redirect and automatically
duplicate the file descriptor.
This was also a good opportunity to remove io::splice from the public
interface since it's not being used and is inherently dangerous.
Review: https://reviews.apache.org/r/24755
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a8c37d46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a8c37d46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a8c37d46
Branch: refs/heads/master
Commit: a8c37d4661c820353b5d622e2d5f9beb9cd1b090
Parents: 02a35ab
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 15 15:07:10 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 15 18:22:01 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/io.hpp | 16 +--
3rdparty/libprocess/src/process.cpp | 100 +++++++++++++++----
3rdparty/libprocess/src/tests/io_tests.cpp | 89 -----------------
.../libprocess/src/tests/subprocess_tests.cpp | 4 +-
4 files changed, 93 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index c775290..6388770 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -40,6 +40,10 @@ Future<size_t> read(int fd, void* data, size_t size);
// Performs a series of asynchronous reads, until EOF is reached.
// NOTE: When using this, ensure the sender will close the connection
// so that EOF can be reached.
+//
+// NOTE: the specified file descriptor is duplicated and set to
+// close-on-exec and made non-blocking (which will return a failure if
+// these operations can not be performed).
Future<std::string> read(int fd);
@@ -55,22 +59,18 @@ Future<size_t> write(int fd, void* data, size_t size);
// Performs a series of asynchronous writes until all of data has been
// written or an error occured in which case a failure is returned.
+//
+// NOTE: the specified file descriptor is duplicated and set to
+// close-on-exec and made non-blocking (which will return a failure if
+// these operations can not be performed).
Future<Nothing> write(int fd, const std::string& data);
-// Splices data from one file descriptor to another. Returns when
-// end-of-file is reached on the input file descriptor or returns a
-// failure if an error occurred while reading or writing. Note that
-// both the 'from' and 'to' file descriptors must be non-blocking.
-Future<Nothing> splice(int from, int to, size_t chunk = 4096);
-
-
// Redirect output from 'from' file descriptor to 'to' file descriptor
// or /dev/null if 'to' is None. Note that depending on how we
// redirect output we duplicate the 'from' and 'to' file descriptors
// so we can control their lifetimes. Returns after EOF has been hit
// on 'from' or some form of failure has occured.
-// TODO(benh): Consider subsuming lower-level 'splice'.
Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
} // namespace io {
http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d403423..ddcedb7 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -4183,6 +4183,24 @@ void ____splice(
}
#endif // __cplusplus >= 201103L
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+ boost::shared_array<char> data(new char[chunk]);
+
+ // Rather than having internal::_splice return a future and
+ // implementing internal::_splice as a chain of io::read and
+ // io::write calls, we use an explicit promise that we pass around
+ // so that we don't increase memory usage the longer that we splice.
+ memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+ Future<Nothing> future = promise->future();
+
+ _splice(from, to, chunk, data, promise);
+
+ return future;
+}
+
} // namespace internal {
@@ -4190,12 +4208,45 @@ Future<string> read(int fd)
{
process::initialize();
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
+
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
+
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
+
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
+
// TODO(benh): Wrap up this data as a struct, use 'Owner'.
// TODO(bmahler): For efficiency, use a rope for the buffer.
memory::shared_ptr<string> buffer(new string());
boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
- return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE);
+ return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+ .onAny(lambda::bind(&os::close, fd));
}
@@ -4203,25 +4254,40 @@ Future<Nothing> write(int fd, const std::string& data)
{
process::initialize();
- return internal::_write(fd, Owned<string>(new string(data)), 0);
-}
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
- boost::shared_array<char> data(new char[chunk]);
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
- // Rather than having internal::_splice return a future and
- // implementing internal::_splice as a chain of io::read and
- // io::write calls, we use an explicit promise that we pass around
- // so that we don't increase memory usage the longer that we splice.
- memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
- Future<Nothing> future = promise->future();
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
- internal::_splice(from, to, chunk, data, promise);
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
- return future;
+ return internal::_write(fd, Owned<string>(new string(data)), 0)
+ .onAny(lambda::bind(&os::close, fd));
}
@@ -4289,7 +4355,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
}
- return splice(from, to.get(), chunk)
+ return internal::splice(from, to.get(), chunk)
.onAny(lambda::bind(&os::close, from))
.onAny(lambda::bind(&os::close, to.get()));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index 05ea7bb..1102b1a 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -290,95 +290,6 @@ TEST(IO, DISABLED_BlockingWrite)
}
-TEST(IO, splice)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- // Create a temporary file for splicing into.
- Try<string> path = os::mktemp();
- ASSERT_SOME(path);
-
- Try<int> fd = os::open(
- path.get(),
- O_WRONLY | O_CREAT | O_TRUNC,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
- ASSERT_SOME(fd);
-
- ASSERT_SOME(os::nonblock(fd.get()));
-
- // Use a pipe for doing the splicing.
- int pipes[2];
-
- // Start with a blocking pipe.
- ASSERT_NE(-1, ::pipe(pipes));
-
- // Test splicing on a blocking file descriptor.
- AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
- ASSERT_SOME(os::close(pipes[0]));
- ASSERT_SOME(os::close(pipes[1]));
-
- // Test on a closed file descriptor.
- AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
- // Now create a nonblocking pipe.
- ASSERT_NE(-1, ::pipe(pipes));
- ASSERT_SOME(os::nonblock(pipes[0]));
- ASSERT_SOME(os::nonblock(pipes[1]));
-
- // Test write to broken pipe.
- ASSERT_SOME(os::close(pipes[0]));
- AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
-
- close(pipes[1]);
-
- // Recreate a nonblocking pipe.
- ASSERT_NE(-1, ::pipe(pipes));
- ASSERT_SOME(os::nonblock(pipes[0]));
- ASSERT_SOME(os::nonblock(pipes[1]));
-
- // Test discard.
- Future<Nothing> splice = io::splice(pipes[0], fd.get());
- EXPECT_TRUE(splice.isPending());
- splice.discard();
- AWAIT_DISCARDED(splice);
-
- // Now write data to the pipe and splice to the file.
- string data =
- "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
- "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
- "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
- "aliquip ex ea commodo consequat. Duis aute irure dolor in "
- "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
- "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
- "culpa qui officia deserunt mollit anim id est laborum.";
-
- // Create more data!
- while (Bytes(data.size()) < Megabytes(1)) {
- data.append(data);
- }
-
- splice = io::splice(pipes[0], fd.get());
-
- AWAIT_READY(io::write(pipes[1], data));
-
- // Closing the write pipe should cause an EOF on the read end, thus
- // completing 'splice'.
- ASSERT_SOME(os::close(pipes[1]));
-
- AWAIT_READY(splice);
-
- ASSERT_SOME(os::close(pipes[0]));
- ASSERT_SOME(os::close(fd.get()));
-
- // Now make sure all the data is there!
- Try<string> read = os::read(path.get());
- ASSERT_SOME(read);
- EXPECT_EQ(data, read.get());
-}
-
-
TEST(IO, redirect)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 98a4e44..5fec289 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -209,7 +209,7 @@ TEST_F(SubprocessTest, PipeInput)
}
-TEST_F(SubprocessTest, PipeSplice)
+TEST_F(SubprocessTest, PipeRedirect)
{
Clock::pause();
@@ -234,7 +234,7 @@ TEST_F(SubprocessTest, PipeSplice)
ASSERT_SOME(s.get().out());
ASSERT_SOME(os::nonblock(s.get().out().get()));
- AWAIT_READY(io::splice(s.get().out().get(), fd.get()));
+ AWAIT_READY(io::redirect(s.get().out().get(), fd.get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {