You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/01/09 03:27:21 UTC
[4/5] mesos git commit: Introduced ControlFlow for process::loop.
Introduced ControlFlow for process::loop.
Review: https://reviews.apache.org/r/54358
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bbb4058d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bbb4058d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bbb4058d
Branch: refs/heads/master
Commit: bbb4058d60b50b54bcc626c25285c993ae4d8a3e
Parents: 63fe4b0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Dec 4 18:42:58 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Jan 7 23:22:24 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/loop.hpp | 214 ++++++++++++++++++----
3rdparty/libprocess/src/http.cpp | 23 ++-
3rdparty/libprocess/src/io.cpp | 21 ++-
3rdparty/libprocess/src/tests/loop_tests.cpp | 39 ++--
4 files changed, 233 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/include/process/loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/loop.hpp b/3rdparty/libprocess/include/process/loop.hpp
index b35f7e6..53f6243 100644
--- a/3rdparty/libprocess/include/process/loop.hpp
+++ b/3rdparty/libprocess/include/process/loop.hpp
@@ -91,21 +91,48 @@ namespace process {
// And now what this looks like using `loop`:
//
// loop(pid,
-// []() { return iterate(); },
+// []() {
+// return iterate();
+// },
// [](T t) {
// return body(t);
// });
-template <typename Iterate, typename Body>
-Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
+//
+// One difference between the `loop` version of the "body" versus the
+// other non-loop examples above is the return value is not `bool` or
+// `Future<bool>` but rather `ControlFlow<V>` or
+// `Future<ControlFlow<V>>`. This enables you to return values out of
+// the loop via a `Break(...)`, for example:
+//
+// loop(pid,
+// []() {
+// return iterate();
+// },
+// [](T t) {
+// if (finished(t)) {
+// return Break(SomeValue());
+// }
+// return Continue();
+// });
+template <typename Iterate,
+ typename Body,
+ typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
+ typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
+ typename V = typename CF::ValueType>
+Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
// A helper for `loop` which creates a Process for us to provide an
// execution context for running the loop.
-template <typename Iterate, typename Body>
-Future<Nothing> loop(Iterate&& iterate, Body&& body)
+template <typename Iterate,
+ typename Body,
+ typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
+ typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
+ typename V = typename CF::ValueType>
+Future<V> loop(Iterate&& iterate, Body&& body)
{
ProcessBase* process = new ProcessBase();
- return loop(
+ return loop<Iterate, Body, T, CF, V>(
spawn(process, true), // Have libprocess free `process`.
std::forward<Iterate>(iterate),
std::forward<Body>(body))
@@ -117,10 +144,117 @@ Future<Nothing> loop(Iterate&& iterate, Body&& body)
}
+// Generic "control flow" construct that is leveraged by
+// implementations such as `loop`. At a high-level a `ControlFlow`
+// represents some control flow statement such as `continue` or
+// `break`, however, these statements can both have values or be
+// value-less (i.e., these are meant to be composed "functionally" so
+// the representation of `break` captures a value that "exits the
+// current function" but the representation of `continue` does not).
+//
+// The pattern here is to define the type/representation of control
+// flow statements within the `ControlFlow` class (e.g.,
+// `ControlFlow::Continue` and `ControlFlow::Break`) but also provide
+// "syntactic sugar" to make it easier to use at the call site (e.g.,
+// the functions `Continue()` and `Break(...)`).
+template <typename T>
+class ControlFlow
+{
+public:
+ using ValueType = T;
+
+ enum class Statement
+ {
+ CONTINUE,
+ BREAK
+ };
+
+ class Continue
+ {
+ public:
+ Continue() = default;
+
+ template <typename U>
+ operator ControlFlow<U>() const
+ {
+ return ControlFlow<U>(ControlFlow<U>::Statement::CONTINUE, None());
+ }
+ };
+
+ class Break
+ {
+ public:
+ Break(T t) : t(std::move(t)) {}
+
+ template <typename U>
+ operator ControlFlow<U>() const &
+ {
+ return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, t);
+ }
+
+ template <typename U>
+ operator ControlFlow<U>() &&
+ {
+ return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, std::move(t));
+ }
+
+ private:
+ T t;
+ };
+
+ Statement statement() const { return s; }
+
+ T& value() & { return t.get(); }
+ const T& value() const & { return t.get(); }
+ T&& value() && { return t.get(); }
+ const T&& value() const && { return t.get(); }
+
+private:
+ template <typename U>
+ friend class ControlFlow<U>::Continue;
+ template <typename U>
+ friend class ControlFlow<U>::Break;
+
+ ControlFlow(Statement s, Option<T> t)
+ : s(s), t(std::move(t)) {}
+
+ Statement s;
+ Option<T> t;
+};
+
+
+// Provides "syntactic sugar" for creating a `ControlFlow::Continue`.
+struct Continue
+{
+ Continue() = default;
+
+ template <typename T>
+ operator ControlFlow<T>() const
+ {
+ return typename ControlFlow<T>::Continue();
+ }
+};
+
+
+// Provides "syntactic sugar" for creating a `ControlFlow::Break`.
+template <typename T>
+typename ControlFlow<typename std::decay<T>::type>::Break Break(T&& t)
+{
+ return typename ControlFlow<typename std::decay<T>::type>::Break(
+ std::forward<T>(t));
+}
+
+
+inline ControlFlow<Nothing>::Break Break()
+{
+ return ControlFlow<Nothing>::Break(Nothing());
+}
+
+
namespace internal {
-template <typename Iterate, typename Body, typename T>
-class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T>>
+template <typename Iterate, typename Body, typename T, typename R>
+class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T, R>>
{
public:
Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
@@ -140,7 +274,7 @@ public:
return std::weak_ptr<Loop>(shared());
}
- Future<Nothing> start()
+ Future<R> start()
{
auto self = shared();
auto weak_self = weak();
@@ -200,39 +334,47 @@ public:
discard = []() {};
while (next.isReady()) {
- Future<bool> condition = body(next.get());
- if (condition.isReady()) {
- if (condition.get()) {
- next = iterate();
- continue;
- } else {
- promise.set(Nothing());
- return;
+ Future<ControlFlow<R>> flow = body(next.get());
+ if (flow.isReady()) {
+ switch (flow->statement()) {
+ case ControlFlow<R>::Statement::CONTINUE: {
+ next = iterate();
+ continue;
+ }
+ case ControlFlow<R>::Statement::BREAK: {
+ promise.set(flow->value());
+ return;
+ }
}
} else {
- auto continuation = [self](const Future<bool>& condition) {
- if (condition.isReady()) {
- if (condition.get()) {
- self->run(self->iterate());
- } else {
- self->promise.set(Nothing());
+ auto continuation = [self](const Future<ControlFlow<R>>& flow) {
+ if (flow.isReady()) {
+ switch (flow->statement()) {
+ case ControlFlow<R>::Statement::CONTINUE: {
+ self->run(self->iterate());
+ break;
+ }
+ case ControlFlow<R>::Statement::BREAK: {
+ self->promise.set(flow->value());
+ break;
+ }
}
- } else if (condition.isFailed()) {
- self->promise.fail(condition.failure());
- } else if (condition.isDiscarded()) {
+ } else if (flow.isFailed()) {
+ self->promise.fail(flow.failure());
+ } else if (flow.isDiscarded()) {
self->promise.discard();
}
};
if (pid.isSome()) {
- condition.onAny(defer(pid.get(), continuation));
+ flow.onAny(defer(pid.get(), continuation));
} else {
- condition.onAny(continuation);
+ flow.onAny(continuation);
}
if (!promise.future().hasDiscard()) {
synchronized (mutex) {
- self->discard = [=]() mutable { condition.discard(); };
+ self->discard = [=]() mutable { flow.discard(); };
}
}
@@ -242,7 +384,7 @@ public:
// discard occurs we'll need to explicitly do discards for
// each new future that blocks.
if (promise.future().hasDiscard()) {
- condition.discard();
+ flow.discard();
}
return;
@@ -282,7 +424,7 @@ private:
const Option<UPID> pid;
Iterate iterate;
Body body;
- Promise<Nothing> promise;
+ Promise<R> promise;
// In order to discard the loop safely we capture the future that
// needs to be discarded within the `discard` function and reading
@@ -294,16 +436,14 @@ private:
} // namespace internal {
-template <typename Iterate, typename Body>
-Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
+template <typename Iterate, typename Body, typename T, typename CF, typename V>
+Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
{
- using T =
- typename internal::unwrap<typename result_of<Iterate()>::type>::type;
-
using Loop = internal::Loop<
typename std::decay<Iterate>::type,
typename std::decay<Body>::type,
- T>;
+ T,
+ V>;
std::shared_ptr<Loop> loop(
new Loop(pid, std::forward<Iterate>(iterate), std::forward<Body>(body)));
http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 97d1424..689a14d 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1607,9 +1607,9 @@ Future<Nothing> send(
[=]() mutable {
return pipeline.get();
},
- [=](const Option<Item>& item) -> Future<bool> {
+ [=](const Option<Item>& item) -> Future<ControlFlow<Nothing>> {
if (item.isNone()) {
- return false;
+ return Break();
}
Request* request = item->request;
@@ -1638,7 +1638,7 @@ Future<Nothing> send(
case Response::NONE: return send(socket, response, request);
}
}()
- .then([=]() {
+ .then([=]() -> ControlFlow<Nothing> {
// Persist the connection if the request expects it and
// the response doesn't include 'Connection: close'.
bool persist = request->keepAlive;
@@ -1647,7 +1647,10 @@ Future<Nothing> send(
persist = false;
}
}
- return persist;
+ if (persist) {
+ return Continue();
+ }
+ return Break();
});
})
.onAny([=]() {
@@ -1678,9 +1681,9 @@ Future<Nothing> receive(
[=]() {
return socket.recv(data, size);
},
- [=](size_t length) mutable -> Future<bool> {
+ [=](size_t length) mutable -> Future<ControlFlow<Nothing>> {
if (length == 0) {
- return false;
+ return Break();
}
// Decode as much of the data as possible into HTTP requests.
@@ -1706,7 +1709,7 @@ Future<Nothing> receive(
pipeline.put(Item{request, f(*request)});
}
- return true; // Keep looping!
+ return Continue(); // Keep looping!
})
.onAny([=]() {
delete decoder;
@@ -1794,15 +1797,15 @@ Future<Nothing> serve(
[=]() mutable {
return pipeline.get();
},
- [=](Option<Item> item) {
+ [=](Option<Item> item) -> ControlFlow<Nothing> {
if (item.isNone()) {
- return false;
+ return Break();
}
delete item->request;
if (promise->future().hasDiscard()) {
item->response.discard();
}
- return true;
+ return Continue();
});
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index 27da897..d0b3ba1 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -329,9 +329,9 @@ Future<Nothing> splice(
[=]() {
return io::read(from, data.get(), chunk);
},
- [=](size_t length) -> Future<bool> {
+ [=](size_t length) -> Future<ControlFlow<Nothing>> {
if (length == 0) { // EOF.
- return false;
+ return Break();
}
// Send the data to the redirect hooks.
@@ -341,8 +341,8 @@ Future<Nothing> splice(
}
return io::write(to, s)
- .then([]() {
- return true;
+ .then([]() -> Future<ControlFlow<Nothing>> {
+ return Continue();
});
});
}
@@ -444,7 +444,13 @@ Future<Nothing> write(int fd, const string& data)
nonblock.error());
}
+ // We store `data.size()` so that we can just use `size` in the
+ // second lambda below versus having to make a copy of `data` in
+ // both lambdas since `data` might be very big and two copies could
+ // be expensive!
const size_t size = data.size();
+
+ // We need to share the `index` between both lambdas below.
std::shared_ptr<size_t> index(new size_t(0));
return loop(
@@ -452,8 +458,11 @@ Future<Nothing> write(int fd, const string& data)
[=]() {
return io::write(fd, data.data() + *index, size - *index);
},
- [=](size_t length) {
- return (*index += length) != size;
+ [=](size_t length) -> ControlFlow<Nothing> {
+ if ((*index += length) != size) {
+ return Continue();
+ }
+ return Break();
})
.onAny([fd]() {
os::close(fd);
http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/tests/loop_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/loop_tests.cpp b/3rdparty/libprocess/src/tests/loop_tests.cpp
index 8435ba8..8d1837a 100644
--- a/3rdparty/libprocess/src/tests/loop_tests.cpp
+++ b/3rdparty/libprocess/src/tests/loop_tests.cpp
@@ -13,17 +13,23 @@
#include <gmock/gmock.h>
#include <atomic>
+#include <string>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/loop.hpp>
#include <process/queue.hpp>
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
using process::Future;
using process::loop;
using process::Promise;
using process::Queue;
+using std::string;
+
TEST(LoopTest, Sync)
{
@@ -33,8 +39,11 @@ TEST(LoopTest, Sync)
[&]() {
return value.load();
},
- [](int i) {
- return i != 0;
+ [](int i) -> ControlFlow<Nothing> {
+ if (i != 0) {
+ return Continue();
+ }
+ return Break();
});
EXPECT_TRUE(future.isPending());
@@ -50,15 +59,18 @@ TEST(LoopTest, Async)
Queue<int> queue;
Promise<int> promise1;
- Promise<bool> promise2;
+ Promise<string> promise2;
- Future<Nothing> future = loop(
+ Future<string> future = loop(
[&]() {
return queue.get();
},
[&](int i) {
promise1.set(i);
- return promise2.future();
+ return promise2.future()
+ .then([](const string& s) -> ControlFlow<string> {
+ return Break(s);
+ });
});
EXPECT_TRUE(future.isPending());
@@ -69,9 +81,11 @@ TEST(LoopTest, Async)
EXPECT_TRUE(future.isPending());
- promise2.set(false);
+ string s = "Hello world!";
- AWAIT_READY(future);
+ promise2.set(s);
+
+ AWAIT_EQ(s, future);
}
@@ -85,8 +99,8 @@ TEST(LoopTest, DiscardIterate)
[&]() {
return promise.future();
},
- [&](int i) {
- return false;
+ [&](int i) -> ControlFlow<Nothing> {
+ return Break();
});
EXPECT_TRUE(future.isPending());
@@ -100,7 +114,7 @@ TEST(LoopTest, DiscardIterate)
TEST(LoopTest, DiscardBody)
{
- Promise<bool> promise;
+ Promise<Nothing> promise;
promise.future().onDiscard([&]() { promise.discard(); });
@@ -109,7 +123,10 @@ TEST(LoopTest, DiscardBody)
return 42;
},
[&](int i) {
- return promise.future();
+ return promise.future()
+ .then([]() -> ControlFlow<Nothing> {
+ return Break();
+ });
});
EXPECT_TRUE(future.isPending());