You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/12/01 18:39:29 UTC
[4/7] mesos git commit: Updated 'io::redirect()' to take an optional
vector of callback hooks.
Updated 'io::redirect()' to take an optional vector of callback hooks.
These callback hooks will be invoked before passing any data read from
the 'from' file descriptor on to the 'to' file descriptor.
Review: https://reviews.apache.org/r/54053
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b7937a68
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b7937a68
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b7937a68
Branch: refs/heads/master
Commit: b7937a68367088f3c1f7c334307422c71737b1d7
Parents: c33ba20
Author: Kevin Klues <kl...@gmail.com>
Authored: Wed Nov 23 22:48:09 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 1 10:11:45 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/io.hpp | 16 ++++++++---
3rdparty/libprocess/src/io.cpp | 36 ++++++++++++++++++++-----
3rdparty/libprocess/src/tests/io_tests.cpp | 17 +++++++++---
3 files changed, 56 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index eec5efd..f5489dc 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -115,7 +115,8 @@ Future<Nothing> write(int fd, const std::string& data);
/**
* Redirect output from the 'from' file descriptor to the 'to' file
- * descriptor (or /dev/null if 'to' is None).
+ * descriptor (or /dev/null if 'to' is None). Optionally call a vector
+ * of callback hooks, passing them the data before it is written to 'to'.
*
* The 'to' and 'from' file descriptors will be duplicated so that the
* file descriptors' lifetimes can be controlled within this function.
@@ -125,10 +126,19 @@ Future<Nothing> write(int fd, const std::string& data);
* descriptor is bad, or if the file descriptor cannot be duplicated,
* set to close-on-exec, or made non-blocking.
*/
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
+Future<Nothing> redirect(
+ int from,
+ Option<int> to,
+ size_t chunk = 4096,
+ const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
+
#ifdef __WINDOWS__
// Version of this function compatible with Windows `HANDLE`.
-Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk = 4096);
+Future<Nothing> redirect(
+ HANDLE from,
+ Option<int> to,
+ size_t chunk = 4096,
+ const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
#endif // __WINDOWS__
http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index d930cee..e81f279 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -28,6 +28,7 @@
#include <stout/try.hpp>
using std::string;
+using std::vector;
namespace process {
namespace io {
@@ -334,6 +335,7 @@ void _splice(
int from,
int to,
size_t chunk,
+ const vector<lambda::function<void(const string&)>>& hooks,
boost::shared_array<char> data,
std::shared_ptr<Promise<Nothing>> promise)
{
@@ -362,12 +364,19 @@ void _splice(
if (size == 0) { // EOF.
promise->set(Nothing());
} else {
+ // Send the data to the redirect hooks.
+ foreach (
+ const lambda::function<void(const string&)>& hook,
+ hooks) {
+ hook(string(data.get(), size));
+ }
+
// Note that we always try and complete the write, even if a
// discard has occurred on our future, in order to provide
// semantics where everything read is written. The promise
// will eventually be discarded in the next read.
io::write(to, string(data.get(), size))
- .onReady([=]() { _splice(from, to, chunk, data, promise); })
+ .onReady([=]() { _splice(from, to, chunk, hooks, data, promise); })
.onFailed([=](const string& message) { promise->fail(message); })
.onDiscarded([=]() { promise->discard(); });
}
@@ -377,7 +386,11 @@ void _splice(
}
-Future<Nothing> splice(int from, int to, size_t chunk)
+Future<Nothing> splice(
+ int from,
+ int to,
+ size_t chunk,
+ const vector<lambda::function<void(const string&)>>& hooks)
{
boost::shared_array<char> data(new char[chunk]);
@@ -389,7 +402,7 @@ Future<Nothing> splice(int from, int to, size_t chunk)
Future<Nothing> future = promise->future();
- _splice(from, to, chunk, data, promise);
+ _splice(from, to, chunk, hooks, data, promise);
return future;
}
@@ -496,7 +509,11 @@ Future<Nothing> write(int fd, const string& data)
}
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+Future<Nothing> redirect(
+ int from,
+ Option<int> to,
+ size_t chunk,
+ const vector<lambda::function<void(const string&)>>& hooks)
{
// Make sure we've got "valid" file descriptors.
if (from < 0 || (to.isSome() && to.get() < 0)) {
@@ -562,7 +579,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
}
// NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
- return internal::splice(from, to.get(), chunk)
+ return internal::splice(from, to.get(), chunk, hooks)
.onAny([from]() { os::close(from); })
.onAny([to]() { os::close(to.get()); });
}
@@ -571,12 +588,17 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
#ifdef __WINDOWS__
// NOTE: Ordinarily this would go in a Windows-specific header; we put it here
// to avoid complex forward declarations.
-Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk)
+Future<Nothing> redirect(
+ HANDLE from,
+ Option<int> to,
+ size_t chunk,
+ const vector<lambda::function<void(const string&)>>& hooks)
{
return redirect(
_open_osfhandle(reinterpret_cast<intptr_t>(from), O_RDWR),
to,
- chunk);
+ chunk,
+ hooks);
}
#endif // __WINDOWS__
http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index 66b610e..26365a5 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -322,7 +322,14 @@ TEST_F(IOTest, Redirect)
ASSERT_SOME(os::nonblock(pipes[0]));
ASSERT_SOME(os::nonblock(pipes[1]));
- // Now write data to the pipe and splice to the file.
+ // Set up a redirect hook to also accumlate the data that we splice.
+ string accumulated = "";
+ lambda::function<void(const string&)> hook =
+ [&accumulated](const string& data) {
+ accumulated += data;
+ };
+
+ // Now write data to the pipe and splice to the file and the redirect hook.
string data =
"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
@@ -337,7 +344,7 @@ TEST_F(IOTest, Redirect)
data.append(data);
}
- Future<Nothing> redirect = io::redirect(pipes[0], fd.get());
+ Future<Nothing> redirect = io::redirect(pipes[0], fd.get(), 256, {hook});
// Closing the read end of the pipe and the file should not have any
// impact as we dup the file descriptor.
@@ -358,10 +365,14 @@ TEST_F(IOTest, Redirect)
AWAIT_READY(redirect);
- // Now make sure all the data is there!
+ // Now make sure all the data is in the file!
Try<string> read = os::read(path.get());
ASSERT_SOME(read);
EXPECT_EQ(data, read.get());
+
+ // Also make sure the data was properly
+ // accumulated in the redirect hook.
+ EXPECT_EQ(data, accumulated);
}