You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/09/27 01:01:43 UTC
[3/3] mesos git commit: Added `process::Executor::execute()`.
Added `process::Executor::execute()`.
This patch adds a convenient interface to `process::Executor` to
asynchronously execute arbitrary functions.
Review: https://reviews.apache.org/r/62252/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf82953f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf82953f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf82953f
Branch: refs/heads/1.4.x
Commit: bf82953f1ede7ddf182f9cad79a3248ef2630dc8
Parents: f82cd43
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Sep 25 14:10:27 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700
----------------------------------------------------------------------
.../libprocess/include/process/executor.hpp | 46 ++++++++++-
3rdparty/libprocess/src/tests/process_tests.cpp | 85 +++++++++++++++++---
2 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/include/process/executor.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/executor.hpp b/3rdparty/libprocess/include/process/executor.hpp
index cd2f2f0..95a9e69 100644
--- a/3rdparty/libprocess/include/process/executor.hpp
+++ b/3rdparty/libprocess/include/process/executor.hpp
@@ -17,12 +17,15 @@
#include <process/id.hpp>
#include <process/process.hpp>
+#include <stout/nothing.hpp>
+#include <stout/result_of.hpp>
+
namespace process {
// Provides an abstraction that can take a standard function object
-// and defer it without needing a process. Each converted function
-// object will get execute serially with respect to one another when
-// invoked.
+// and defer or asynchronously execute it without needing a process.
+// Each converted function object will get execute serially with respect
+// to one another when invoked.
class Executor
{
public:
@@ -39,7 +42,7 @@ public:
void stop()
{
- terminate(&process);
+ terminate(process);
// TODO(benh): Note that this doesn't wait because that could
// cause a deadlock ... thus, the semantics here are that no more
@@ -60,6 +63,41 @@ private:
Executor& operator=(const Executor&);
ProcessBase process;
+
+
+public:
+ template <
+ typename F,
+ typename R = typename result_of<F()>::type,
+ typename std::enable_if<!std::is_void<R>::value, int>::type = 0>
+ auto execute(F&& f)
+ -> decltype(dispatch(process, std::function<R()>(std::forward<F>(f))))
+ {
+ // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+ // because it would be captured by copy, so we convert `f` into a
+ // `std::function` to bypass this restriction.
+ return dispatch(process, std::function<R()>(std::forward<F>(f)));
+ }
+
+ // NOTE: This overload for `void` returns `Future<Nothing>` so we can
+ // chain. This follows the same behavior as `async()`.
+ template <
+ typename F,
+ typename R = typename result_of<F()>::type,
+ typename std::enable_if<std::is_void<R>::value, int>::type = 0>
+ Future<Nothing> execute(F&& f)
+ {
+ // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+ // because it would be captured by copy, so we convert `f` into a
+ // `std::function` to bypass this restriction. This wrapper also
+ // avoids `f` being evaluated when it is a nested bind.
+ // TODO(chhsiao): Capture `f` by forwarding once we switch to C++14.
+ return dispatch(
+ process,
+ std::bind(
+ [](const std::function<R()>& f_) { f_(); return Nothing(); },
+ std::function<R()>(std::forward<F>(f))));
+ }
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 82efb2f..e494318 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -31,6 +31,7 @@
#include <process/async.hpp>
#include <process/clock.hpp>
+#include <process/count_down_latch.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
@@ -70,6 +71,7 @@ namespace inet4 = process::network::inet4;
using process::async;
using process::Clock;
+using process::CountDownLatch;
using process::defer;
using process::Deferred;
using process::Event;
@@ -102,6 +104,7 @@ using std::vector;
using testing::_;
using testing::Assign;
using testing::DoAll;
+using testing::InvokeWithoutArgs;
using testing::Return;
using testing::ReturnArg;
@@ -1201,20 +1204,17 @@ public:
};
-TEST(ProcessTest, THREADSAFE_Executor)
+TEST(ProcessTest, THREADSAFE_Executor_Defer)
{
- std::atomic_bool event1Called(false);
- std::atomic_bool event2Called(false);
-
EventReceiver receiver;
+ Executor executor;
- EXPECT_CALL(receiver, event1(42))
- .WillOnce(Assign(&event1Called, true));
-
- EXPECT_CALL(receiver, event2("event2"))
- .WillOnce(Assign(&event2Called, true));
+ CountDownLatch event1Called;
- Executor executor;
+ EXPECT_CALL(receiver, event1(42))
+ .WillOnce(InvokeWithoutArgs([&]() {
+ event1Called.decrement();
+ }));
Deferred<void(int)> event1 =
executor.defer([&receiver](int i) {
@@ -1223,6 +1223,15 @@ TEST(ProcessTest, THREADSAFE_Executor)
event1(42);
+ AWAIT_READY(event1Called.triggered());
+
+ CountDownLatch event2Called;
+
+ EXPECT_CALL(receiver, event2("event2"))
+ .WillOnce(InvokeWithoutArgs([&]() {
+ event2Called.decrement();
+ }));
+
Deferred<void(const string&)> event2 =
executor.defer([&receiver](const string& s) {
return receiver.event2(s);
@@ -1230,8 +1239,60 @@ TEST(ProcessTest, THREADSAFE_Executor)
event2("event2");
- while (event1Called.load() == false);
- while (event2Called.load() == false);
+ AWAIT_READY(event2Called.triggered());
+}
+
+
+TEST(ProcessTest, THREADSAFE_Executor_Execute)
+{
+ Executor executor;
+
+ // A void immutable lambda.
+ CountDownLatch f1Result;
+ auto f1 = [&f1Result] {
+ f1Result.decrement();
+ };
+
+ // Specify the return type explicitly for type checking. Same below.
+ Future<Nothing> f1Called = executor.execute(f1);
+
+ AWAIT_READY(f1Called);
+ AWAIT_READY(f1Result.triggered());
+
+ // A void mutable bind.
+ CountDownLatch f2Result;
+ int f2State = 0;
+ auto f2 = [&f2Result, f2State](int) mutable -> void {
+ f2State++;
+ f2Result.decrement();
+ };
+
+ Future<Nothing> f2Called = executor.execute(std::bind(f2, 42));
+
+ AWAIT_READY(f2Called);
+ AWAIT_READY(f2Result.triggered());
+
+ // A non-void immutable lambda.
+ const string f3Result = "f3";
+ auto f3 = [&f3Result] {
+ return f3Result;
+ };
+
+ Future<string> f3Called = executor.execute(f3);
+
+ AWAIT_EXPECT_EQ(f3Result, f3Called);
+
+ // A mutable bind returning a future.
+ const string f4Result = "f4";
+ int f4State = 0;
+ auto f4 = [&f4Result, f4State](int) mutable -> Future<string> {
+ f4State++;
+ return f4Result;
+ };
+
+ Future<string> f4Called = executor.execute(std::bind(f4, 42));
+
+ AWAIT_EXPECT_EQ(f4Result, f4Called);
}