You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/01/09 03:27:21 UTC

[4/5] mesos git commit: Introduced ControlFlow for process::loop.

Introduced ControlFlow for process::loop.

Review: https://reviews.apache.org/r/54358


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bbb4058d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bbb4058d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bbb4058d

Branch: refs/heads/master
Commit: bbb4058d60b50b54bcc626c25285c993ae4d8a3e
Parents: 63fe4b0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Dec 4 18:42:58 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Jan 7 23:22:24 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/loop.hpp | 214 ++++++++++++++++++----
 3rdparty/libprocess/src/http.cpp             |  23 ++-
 3rdparty/libprocess/src/io.cpp               |  21 ++-
 3rdparty/libprocess/src/tests/loop_tests.cpp |  39 ++--
 4 files changed, 233 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/include/process/loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/loop.hpp b/3rdparty/libprocess/include/process/loop.hpp
index b35f7e6..53f6243 100644
--- a/3rdparty/libprocess/include/process/loop.hpp
+++ b/3rdparty/libprocess/include/process/loop.hpp
@@ -91,21 +91,48 @@ namespace process {
 // And now what this looks like using `loop`:
 //
 //     loop(pid,
-//          []() { return iterate(); },
+//          []() {
+//            return iterate();
+//          },
 //          [](T t) {
 //            return body(t);
 //          });
-template <typename Iterate, typename Body>
-Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
+//
+// One difference between the `loop` version of the "body" versus the
+// other non-loop examples above is the return value is not `bool` or
+// `Future<bool>` but rather `ControlFlow<V>` or
+// `Future<ControlFlow<V>>`. This enables you to return values out of
+// the loop via a `Break(...)`, for example:
+//
+//     loop(pid,
+//          []() {
+//            return iterate();
+//          },
+//          [](T t) {
+//            if (finished(t)) {
+//              return Break(SomeValue());
+//            }
+//            return Continue();
+//          });
+template <typename Iterate,
+          typename Body,
+          typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
+          typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
+          typename V = typename CF::ValueType>
+Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
 
 
 // A helper for `loop` which creates a Process for us to provide an
 // execution context for running the loop.
-template <typename Iterate, typename Body>
-Future<Nothing> loop(Iterate&& iterate, Body&& body)
+template <typename Iterate,
+          typename Body,
+          typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
+          typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
+          typename V = typename CF::ValueType>
+Future<V> loop(Iterate&& iterate, Body&& body)
 {
   ProcessBase* process = new ProcessBase();
-  return loop(
+  return loop<Iterate, Body, T, CF, V>(
       spawn(process, true), // Have libprocess free `process`.
       std::forward<Iterate>(iterate),
       std::forward<Body>(body))
@@ -117,10 +144,117 @@ Future<Nothing> loop(Iterate&& iterate, Body&& body)
 }
 
 
+// Generic "control flow" construct that is leveraged by
+// implementations such as `loop`. At a high-level a `ControlFlow`
+// represents some control flow statement such as `continue` or
+// `break`, however, these statements can both have values or be
+// value-less (i.e., these are meant to be composed "functionally" so
+// the representation of `break` captures a value that "exits the
+// current function" but the representation of `continue` does not).
+//
+// The pattern here is to define the type/representation of control
+// flow statements within the `ControlFlow` class (e.g.,
+// `ControlFlow::Continue` and `ControlFlow::Break`) but also provide
+// "syntactic sugar" to make it easier to use at the call site (e.g.,
+// the functions `Continue()` and `Break(...)`).
+template <typename T>
+class ControlFlow
+{
+public:
+  using ValueType = T;
+
+  enum class Statement
+  {
+    CONTINUE,
+    BREAK
+  };
+
+  class Continue
+  {
+  public:
+    Continue() = default;
+
+    template <typename U>
+    operator ControlFlow<U>() const
+    {
+      return ControlFlow<U>(ControlFlow<U>::Statement::CONTINUE, None());
+    }
+  };
+
+  class Break
+  {
+  public:
+    Break(T t) : t(std::move(t)) {}
+
+    template <typename U>
+    operator ControlFlow<U>() const &
+    {
+      return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, t);
+    }
+
+    template <typename U>
+    operator ControlFlow<U>() &&
+    {
+      return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, std::move(t));
+    }
+
+  private:
+    T t;
+  };
+
+  Statement statement() const { return s; }
+
+  T& value() & { return t.get(); }
+  const T& value() const & { return t.get(); }
+  T&& value() && { return t.get(); }
+  const T&& value() const && { return t.get(); }
+
+private:
+  template <typename U>
+  friend class ControlFlow<U>::Continue;
+  template <typename U>
+  friend class ControlFlow<U>::Break;
+
+  ControlFlow(Statement s, Option<T> t)
+    : s(s), t(std::move(t)) {}
+
+  Statement s;
+  Option<T> t;
+};
+
+
+// Provides "syntactic sugar" for creating a `ControlFlow::Continue`.
+struct Continue
+{
+  Continue() = default;
+
+  template <typename T>
+  operator ControlFlow<T>() const
+  {
+    return typename ControlFlow<T>::Continue();
+  }
+};
+
+
+// Provides "syntactic sugar" for creating a `ControlFlow::Break`.
+template <typename T>
+typename ControlFlow<typename std::decay<T>::type>::Break Break(T&& t)
+{
+  return typename ControlFlow<typename std::decay<T>::type>::Break(
+      std::forward<T>(t));
+}
+
+
+inline ControlFlow<Nothing>::Break Break()
+{
+  return ControlFlow<Nothing>::Break(Nothing());
+}
+
+
 namespace internal {
 
-template <typename Iterate, typename Body, typename T>
-class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T>>
+template <typename Iterate, typename Body, typename T, typename R>
+class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T, R>>
 {
 public:
   Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
@@ -140,7 +274,7 @@ public:
     return std::weak_ptr<Loop>(shared());
   }
 
-  Future<Nothing> start()
+  Future<R> start()
   {
     auto self = shared();
     auto weak_self = weak();
@@ -200,39 +334,47 @@ public:
     discard = []() {};
 
     while (next.isReady()) {
-      Future<bool> condition = body(next.get());
-      if (condition.isReady()) {
-        if (condition.get()) {
-          next = iterate();
-          continue;
-        } else {
-          promise.set(Nothing());
-          return;
+      Future<ControlFlow<R>> flow = body(next.get());
+      if (flow.isReady()) {
+        switch (flow->statement()) {
+          case ControlFlow<R>::Statement::CONTINUE: {
+            next = iterate();
+            continue;
+          }
+          case ControlFlow<R>::Statement::BREAK: {
+            promise.set(flow->value());
+            return;
+          }
         }
       } else {
-        auto continuation = [self](const Future<bool>& condition) {
-          if (condition.isReady()) {
-            if (condition.get()) {
-              self->run(self->iterate());
-            } else {
-              self->promise.set(Nothing());
+        auto continuation = [self](const Future<ControlFlow<R>>& flow) {
+          if (flow.isReady()) {
+            switch (flow->statement()) {
+              case ControlFlow<R>::Statement::CONTINUE: {
+                self->run(self->iterate());
+                break;
+              }
+              case ControlFlow<R>::Statement::BREAK: {
+                self->promise.set(flow->value());
+                break;
+              }
             }
-          } else if (condition.isFailed()) {
-            self->promise.fail(condition.failure());
-          } else if (condition.isDiscarded()) {
+          } else if (flow.isFailed()) {
+            self->promise.fail(flow.failure());
+          } else if (flow.isDiscarded()) {
             self->promise.discard();
           }
         };
 
         if (pid.isSome()) {
-          condition.onAny(defer(pid.get(), continuation));
+          flow.onAny(defer(pid.get(), continuation));
         } else {
-          condition.onAny(continuation);
+          flow.onAny(continuation);
         }
 
         if (!promise.future().hasDiscard()) {
           synchronized (mutex) {
-            self->discard = [=]() mutable { condition.discard(); };
+            self->discard = [=]() mutable { flow.discard(); };
           }
         }
 
@@ -242,7 +384,7 @@ public:
         // discard occurs we'll need to explicitly do discards for
         // each new future that blocks.
         if (promise.future().hasDiscard()) {
-          condition.discard();
+          flow.discard();
         }
 
         return;
@@ -282,7 +424,7 @@ private:
   const Option<UPID> pid;
   Iterate iterate;
   Body body;
-  Promise<Nothing> promise;
+  Promise<R> promise;
 
   // In order to discard the loop safely we capture the future that
   // needs to be discarded within the `discard` function and reading
@@ -294,16 +436,14 @@ private:
 } // namespace internal {
 
 
-template <typename Iterate, typename Body>
-Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
+template <typename Iterate, typename Body, typename T, typename CF, typename V>
+Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
 {
-  using T =
-    typename internal::unwrap<typename result_of<Iterate()>::type>::type;
-
   using Loop = internal::Loop<
     typename std::decay<Iterate>::type,
     typename std::decay<Body>::type,
-    T>;
+    T,
+    V>;
 
   std::shared_ptr<Loop> loop(
       new Loop(pid, std::forward<Iterate>(iterate), std::forward<Body>(body)));

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 97d1424..689a14d 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1607,9 +1607,9 @@ Future<Nothing> send(
       [=]() mutable {
         return pipeline.get();
       },
-      [=](const Option<Item>& item) -> Future<bool> {
+      [=](const Option<Item>& item) -> Future<ControlFlow<Nothing>> {
         if (item.isNone()) {
-          return false;
+          return Break();
         }
 
         Request* request = item->request;
@@ -1638,7 +1638,7 @@ Future<Nothing> send(
                 case Response::NONE: return send(socket, response, request);
               }
             }()
-            .then([=]() {
+            .then([=]() -> ControlFlow<Nothing> {
               // Persist the connection if the request expects it and
               // the response doesn't include 'Connection: close'.
               bool persist = request->keepAlive;
@@ -1647,7 +1647,10 @@ Future<Nothing> send(
                   persist = false;
                 }
               }
-              return persist;
+              if (persist) {
+                return Continue();
+              }
+              return Break();
             });
           })
           .onAny([=]() {
@@ -1678,9 +1681,9 @@ Future<Nothing> receive(
       [=]() {
         return socket.recv(data, size);
       },
-      [=](size_t length) mutable -> Future<bool> {
+      [=](size_t length) mutable -> Future<ControlFlow<Nothing>> {
         if (length == 0) {
-          return false;
+          return Break();
         }
 
         // Decode as much of the data as possible into HTTP requests.
@@ -1706,7 +1709,7 @@ Future<Nothing> receive(
           pipeline.put(Item{request, f(*request)});
         }
 
-        return true; // Keep looping!
+        return Continue(); // Keep looping!
       })
     .onAny([=]() {
       delete decoder;
@@ -1794,15 +1797,15 @@ Future<Nothing> serve(
              [=]() mutable {
                return pipeline.get();
              },
-             [=](Option<Item> item) {
+             [=](Option<Item> item) -> ControlFlow<Nothing> {
                if (item.isNone()) {
-                 return false;
+                 return Break();
                }
                delete item->request;
                if (promise->future().hasDiscard()) {
                  item->response.discard();
                }
-               return true;
+               return Continue();
              });
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index 27da897..d0b3ba1 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -329,9 +329,9 @@ Future<Nothing> splice(
       [=]() {
         return io::read(from, data.get(), chunk);
       },
-      [=](size_t length) -> Future<bool> {
+      [=](size_t length) -> Future<ControlFlow<Nothing>> {
         if (length == 0) { // EOF.
-          return false;
+          return Break();
         }
 
         // Send the data to the redirect hooks.
@@ -341,8 +341,8 @@ Future<Nothing> splice(
         }
 
         return io::write(to, s)
-          .then([]() {
-            return true;
+          .then([]() -> Future<ControlFlow<Nothing>> {
+            return Continue();
           });
       });
 }
@@ -444,7 +444,13 @@ Future<Nothing> write(int fd, const string& data)
         nonblock.error());
   }
 
+  // We store `data.size()` so that we can just use `size` in the
+  // second lambda below versus having to make a copy of `data` in
+  // both lambdas since `data` might be very big and two copies could
+  // be expensive!
   const size_t size = data.size();
+
+  // We need to share the `index` between both lambdas below.
   std::shared_ptr<size_t> index(new size_t(0));
 
   return loop(
@@ -452,8 +458,11 @@ Future<Nothing> write(int fd, const string& data)
       [=]() {
         return io::write(fd, data.data() + *index, size - *index);
       },
-      [=](size_t length) {
-        return (*index += length) != size;
+      [=](size_t length) -> ControlFlow<Nothing> {
+        if ((*index += length) != size) {
+          return Continue();
+        }
+        return Break();
       })
     .onAny([fd]() {
         os::close(fd);

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbb4058d/3rdparty/libprocess/src/tests/loop_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/loop_tests.cpp b/3rdparty/libprocess/src/tests/loop_tests.cpp
index 8435ba8..8d1837a 100644
--- a/3rdparty/libprocess/src/tests/loop_tests.cpp
+++ b/3rdparty/libprocess/src/tests/loop_tests.cpp
@@ -13,17 +13,23 @@
 #include <gmock/gmock.h>
 
 #include <atomic>
+#include <string>
 
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/loop.hpp>
 #include <process/queue.hpp>
 
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
 using process::Future;
 using process::loop;
 using process::Promise;
 using process::Queue;
 
+using std::string;
+
 
 TEST(LoopTest, Sync)
 {
@@ -33,8 +39,11 @@ TEST(LoopTest, Sync)
       [&]() {
         return value.load();
       },
-      [](int i) {
-        return i != 0;
+      [](int i) -> ControlFlow<Nothing> {
+        if (i != 0) {
+          return Continue();
+        }
+        return Break();
       });
 
   EXPECT_TRUE(future.isPending());
@@ -50,15 +59,18 @@ TEST(LoopTest, Async)
   Queue<int> queue;
 
   Promise<int> promise1;
-  Promise<bool> promise2;
+  Promise<string> promise2;
 
-  Future<Nothing> future = loop(
+  Future<string> future = loop(
       [&]() {
         return queue.get();
       },
       [&](int i) {
         promise1.set(i);
-        return promise2.future();
+        return promise2.future()
+          .then([](const string& s) -> ControlFlow<string> {
+            return Break(s);
+          });
       });
 
   EXPECT_TRUE(future.isPending());
@@ -69,9 +81,11 @@ TEST(LoopTest, Async)
 
   EXPECT_TRUE(future.isPending());
 
-  promise2.set(false);
+  string s = "Hello world!";
 
-  AWAIT_READY(future);
+  promise2.set(s);
+
+  AWAIT_EQ(s, future);
 }
 
 
@@ -85,8 +99,8 @@ TEST(LoopTest, DiscardIterate)
       [&]() {
         return promise.future();
       },
-      [&](int i) {
-        return false;
+      [&](int i) -> ControlFlow<Nothing> {
+        return Break();
       });
 
   EXPECT_TRUE(future.isPending());
@@ -100,7 +114,7 @@ TEST(LoopTest, DiscardIterate)
 
 TEST(LoopTest, DiscardBody)
 {
-  Promise<bool> promise;
+  Promise<Nothing> promise;
 
   promise.future().onDiscard([&]() { promise.discard(); });
 
@@ -109,7 +123,10 @@ TEST(LoopTest, DiscardBody)
         return 42;
       },
       [&](int i) {
-        return promise.future();
+        return promise.future()
+          .then([]() -> ControlFlow<Nothing> {
+            return Break();
+          });
       });
 
   EXPECT_TRUE(future.isPending());