You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/12/01 18:39:29 UTC

[4/7] mesos git commit: Updated 'io::redirect()' to take an optional vector of callback hooks.

Updated 'io::redirect()' to take an optional vector of callback hooks.

These callback hooks will be invoked before passing any data read from
the 'from' file descriptor on to the 'to' file descriptor.

Review: https://reviews.apache.org/r/54053


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b7937a68
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b7937a68
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b7937a68

Branch: refs/heads/master
Commit: b7937a68367088f3c1f7c334307422c71737b1d7
Parents: c33ba20
Author: Kevin Klues <kl...@gmail.com>
Authored: Wed Nov 23 22:48:09 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 1 10:11:45 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp | 16 ++++++++---
 3rdparty/libprocess/src/io.cpp             | 36 ++++++++++++++++++++-----
 3rdparty/libprocess/src/tests/io_tests.cpp | 17 +++++++++---
 3 files changed, 56 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index eec5efd..f5489dc 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -115,7 +115,8 @@ Future<Nothing> write(int fd, const std::string& data);
 
 /**
  * Redirect output from the 'from' file descriptor to the 'to' file
- * descriptor (or /dev/null if 'to' is None).
+ * descriptor (or /dev/null if 'to' is None). Optionally call a vector
+ * of callback hooks, passing them the data before it is written to 'to'.
  *
  * The 'to' and 'from' file descriptors will be duplicated so that the
  * file descriptors' lifetimes can be controlled within this function.
@@ -125,10 +126,19 @@ Future<Nothing> write(int fd, const std::string& data);
  *     descriptor is bad, or if the file descriptor cannot be duplicated,
  *     set to close-on-exec, or made non-blocking.
  */
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
+Future<Nothing> redirect(
+    int from,
+    Option<int> to,
+    size_t chunk = 4096,
+    const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
+
 #ifdef __WINDOWS__
 // Version of this function compatible with Windows `HANDLE`.
-Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk = 4096);
+Future<Nothing> redirect(
+    HANDLE from,
+    Option<int> to,
+    size_t chunk = 4096,
+    const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
 #endif // __WINDOWS__
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index d930cee..e81f279 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -28,6 +28,7 @@
 #include <stout/try.hpp>
 
 using std::string;
+using std::vector;
 
 namespace process {
 namespace io {
@@ -334,6 +335,7 @@ void _splice(
     int from,
     int to,
     size_t chunk,
+    const vector<lambda::function<void(const string&)>>& hooks,
     boost::shared_array<char> data,
     std::shared_ptr<Promise<Nothing>> promise)
 {
@@ -362,12 +364,19 @@ void _splice(
       if (size == 0) { // EOF.
         promise->set(Nothing());
       } else {
+        // Send the data to the redirect hooks.
+        foreach (
+            const lambda::function<void(const string&)>& hook,
+            hooks) {
+          hook(string(data.get(), size));
+        }
+
         // Note that we always try and complete the write, even if a
         // discard has occurred on our future, in order to provide
         // semantics where everything read is written. The promise
         // will eventually be discarded in the next read.
         io::write(to, string(data.get(), size))
-          .onReady([=]() { _splice(from, to, chunk, data, promise); })
+          .onReady([=]() { _splice(from, to, chunk, hooks, data, promise); })
           .onFailed([=](const string& message) { promise->fail(message); })
           .onDiscarded([=]() { promise->discard(); });
       }
@@ -377,7 +386,11 @@ void _splice(
 }
 
 
-Future<Nothing> splice(int from, int to, size_t chunk)
+Future<Nothing> splice(
+    int from,
+    int to,
+    size_t chunk,
+    const vector<lambda::function<void(const string&)>>& hooks)
 {
   boost::shared_array<char> data(new char[chunk]);
 
@@ -389,7 +402,7 @@ Future<Nothing> splice(int from, int to, size_t chunk)
 
   Future<Nothing> future = promise->future();
 
-  _splice(from, to, chunk, data, promise);
+  _splice(from, to, chunk, hooks, data, promise);
 
   return future;
 }
@@ -496,7 +509,11 @@ Future<Nothing> write(int fd, const string& data)
 }
 
 
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+Future<Nothing> redirect(
+    int from,
+    Option<int> to,
+    size_t chunk,
+    const vector<lambda::function<void(const string&)>>& hooks)
 {
   // Make sure we've got "valid" file descriptors.
   if (from < 0 || (to.isSome() && to.get() < 0)) {
@@ -562,7 +579,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
   }
 
   // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
-  return internal::splice(from, to.get(), chunk)
+  return internal::splice(from, to.get(), chunk, hooks)
     .onAny([from]() { os::close(from); })
     .onAny([to]() { os::close(to.get()); });
 }
@@ -571,12 +588,17 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
 #ifdef __WINDOWS__
 // NOTE: Ordinarily this would go in a Windows-specific header; we put it here
 // to avoid complex forward declarations.
-Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk)
+Future<Nothing> redirect(
+    HANDLE from,
+    Option<int> to,
+    size_t chunk,
+    const vector<lambda::function<void(const string&)>>& hooks)
 {
   return redirect(
       _open_osfhandle(reinterpret_cast<intptr_t>(from), O_RDWR),
       to,
-      chunk);
+      chunk,
+      hooks);
 }
 #endif // __WINDOWS__
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index 66b610e..26365a5 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -322,7 +322,14 @@ TEST_F(IOTest, Redirect)
   ASSERT_SOME(os::nonblock(pipes[0]));
   ASSERT_SOME(os::nonblock(pipes[1]));
 
-  // Now write data to the pipe and splice to the file.
+  // Set up a redirect hook to also accumlate the data that we splice.
+  string accumulated = "";
+  lambda::function<void(const string&)> hook =
+    [&accumulated](const string& data) {
+      accumulated += data;
+    };
+
+  // Now write data to the pipe and splice to the file and the redirect hook.
   string data =
     "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
     "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
@@ -337,7 +344,7 @@ TEST_F(IOTest, Redirect)
     data.append(data);
   }
 
-  Future<Nothing> redirect = io::redirect(pipes[0], fd.get());
+  Future<Nothing> redirect = io::redirect(pipes[0], fd.get(), 256, {hook});
 
   // Closing the read end of the pipe and the file should not have any
   // impact as we dup the file descriptor.
@@ -358,10 +365,14 @@ TEST_F(IOTest, Redirect)
 
   AWAIT_READY(redirect);
 
-  // Now make sure all the data is there!
+  // Now make sure all the data is in the file!
   Try<string> read = os::read(path.get());
   ASSERT_SOME(read);
   EXPECT_EQ(data, read.get());
+
+  // Also make sure the data was properly
+  // accumulated in the redirect hook.
+  EXPECT_EQ(data, accumulated);
 }