You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/09/27 01:01:43 UTC

[3/3] mesos git commit: Added `process::Executor::execute()`.

Added `process::Executor::execute()`.

This patch adds a convenient interface to `process::Executor` to
asynchronously execute arbitrary functions.

Review: https://reviews.apache.org/r/62252/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf82953f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf82953f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf82953f

Branch: refs/heads/1.4.x
Commit: bf82953f1ede7ddf182f9cad79a3248ef2630dc8
Parents: f82cd43
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Sep 25 14:10:27 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/executor.hpp     | 46 ++++++++++-
 3rdparty/libprocess/src/tests/process_tests.cpp | 85 +++++++++++++++++---
 2 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/include/process/executor.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/executor.hpp b/3rdparty/libprocess/include/process/executor.hpp
index cd2f2f0..95a9e69 100644
--- a/3rdparty/libprocess/include/process/executor.hpp
+++ b/3rdparty/libprocess/include/process/executor.hpp
@@ -17,12 +17,15 @@
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/nothing.hpp>
+#include <stout/result_of.hpp>
+
 namespace process {
 
 // Provides an abstraction that can take a standard function object
-// and defer it without needing a process. Each converted function
-// object will get execute serially with respect to one another when
-// invoked.
+// and defer or asynchronously execute it without needing a process.
+// Each converted function object will get execute serially with respect
+// to one another when invoked.
 class Executor
 {
 public:
@@ -39,7 +42,7 @@ public:
 
   void stop()
   {
-    terminate(&process);
+    terminate(process);
 
     // TODO(benh): Note that this doesn't wait because that could
     // cause a deadlock ... thus, the semantics here are that no more
@@ -60,6 +63,41 @@ private:
   Executor& operator=(const Executor&);
 
   ProcessBase process;
+
+
+public:
+  template <
+      typename F,
+      typename R = typename result_of<F()>::type,
+      typename std::enable_if<!std::is_void<R>::value, int>::type = 0>
+  auto execute(F&& f)
+    -> decltype(dispatch(process, std::function<R()>(std::forward<F>(f))))
+  {
+    // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+    // because it would be captured by copy, so we convert `f` into a
+    // `std::function` to bypass this restriction.
+    return dispatch(process, std::function<R()>(std::forward<F>(f)));
+  }
+
+  // NOTE: This overload for `void` returns `Future<Nothing>` so we can
+  // chain. This follows the same behavior as `async()`.
+  template <
+      typename F,
+      typename R = typename result_of<F()>::type,
+      typename std::enable_if<std::is_void<R>::value, int>::type = 0>
+  Future<Nothing> execute(F&& f)
+  {
+    // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+    // because it would be captured by copy, so we convert `f` into a
+    // `std::function` to bypass this restriction. This wrapper also
+    // avoids `f` being evaluated when it is a nested bind.
+    // TODO(chhsiao): Capture `f` by forwarding once we switch to C++14.
+    return dispatch(
+        process,
+        std::bind(
+            [](const std::function<R()>& f_) { f_(); return Nothing(); },
+            std::function<R()>(std::forward<F>(f))));
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 82efb2f..e494318 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/async.hpp>
 #include <process/clock.hpp>
+#include <process/count_down_latch.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
@@ -70,6 +71,7 @@ namespace inet4 = process::network::inet4;
 
 using process::async;
 using process::Clock;
+using process::CountDownLatch;
 using process::defer;
 using process::Deferred;
 using process::Event;
@@ -102,6 +104,7 @@ using std::vector;
 using testing::_;
 using testing::Assign;
 using testing::DoAll;
+using testing::InvokeWithoutArgs;
 using testing::Return;
 using testing::ReturnArg;
 
@@ -1201,20 +1204,17 @@ public:
 };
 
 
-TEST(ProcessTest, THREADSAFE_Executor)
+TEST(ProcessTest, THREADSAFE_Executor_Defer)
 {
-  std::atomic_bool event1Called(false);
-  std::atomic_bool event2Called(false);
-
   EventReceiver receiver;
+  Executor executor;
 
-  EXPECT_CALL(receiver, event1(42))
-    .WillOnce(Assign(&event1Called, true));
-
-  EXPECT_CALL(receiver, event2("event2"))
-    .WillOnce(Assign(&event2Called, true));
+  CountDownLatch event1Called;
 
-  Executor executor;
+  EXPECT_CALL(receiver, event1(42))
+    .WillOnce(InvokeWithoutArgs([&]() {
+      event1Called.decrement();
+    }));
 
   Deferred<void(int)> event1 =
     executor.defer([&receiver](int i) {
@@ -1223,6 +1223,15 @@ TEST(ProcessTest, THREADSAFE_Executor)
 
   event1(42);
 
+  AWAIT_READY(event1Called.triggered());
+
+  CountDownLatch event2Called;
+
+  EXPECT_CALL(receiver, event2("event2"))
+    .WillOnce(InvokeWithoutArgs([&]() {
+      event2Called.decrement();
+    }));
+
   Deferred<void(const string&)> event2 =
     executor.defer([&receiver](const string& s) {
       return receiver.event2(s);
@@ -1230,8 +1239,60 @@ TEST(ProcessTest, THREADSAFE_Executor)
 
   event2("event2");
 
-  while (event1Called.load() == false);
-  while (event2Called.load() == false);
+  AWAIT_READY(event2Called.triggered());
+}
+
+
+TEST(ProcessTest, THREADSAFE_Executor_Execute)
+{
+  Executor executor;
+
+  // A void immutable lambda.
+  CountDownLatch f1Result;
+  auto f1 = [&f1Result] {
+    f1Result.decrement();
+  };
+
+  // Specify the return type explicitly for type checking. Same below.
+  Future<Nothing> f1Called = executor.execute(f1);
+
+  AWAIT_READY(f1Called);
+  AWAIT_READY(f1Result.triggered());
+
+  // A void mutable bind.
+  CountDownLatch f2Result;
+  int f2State = 0;
+  auto f2 = [&f2Result, f2State](int) mutable -> void {
+    f2State++;
+    f2Result.decrement();
+  };
+
+  Future<Nothing> f2Called = executor.execute(std::bind(f2, 42));
+
+  AWAIT_READY(f2Called);
+  AWAIT_READY(f2Result.triggered());
+
+  // A non-void immutable lambda.
+  const string f3Result = "f3";
+  auto f3 = [&f3Result] {
+    return f3Result;
+  };
+
+  Future<string> f3Called = executor.execute(f3);
+
+  AWAIT_EXPECT_EQ(f3Result, f3Called);
+
+  // A mutable bind returning a future.
+  const string f4Result = "f4";
+  int f4State = 0;
+  auto f4 = [&f4Result, f4State](int) mutable -> Future<string> {
+    f4State++;
+    return f4Result;
+  };
+
+  Future<string> f4Called = executor.execute(std::bind(f4, 42));
+
+  AWAIT_EXPECT_EQ(f4Result, f4Called);
 }