You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/12/14 16:50:22 UTC

[1/2] mesos git commit: Added a synchronous version of loop for io::read/write/redirect.

Repository: mesos
Updated Branches:
  refs/heads/master 608e2006e -> a3a65509a


Added a synchronous version of loop for io::read/write/redirect.

Review: https://reviews.apache.org/r/54295


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fe6c3e46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fe6c3e46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fe6c3e46

Branch: refs/heads/master
Commit: fe6c3e46926d9f13252b2b4d30825e125449c747
Parents: 608e200
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 1 22:45:01 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:44:38 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/loop.hpp | 217 +++++++++++++---------
 3rdparty/libprocess/src/http.cpp             |   3 +-
 3rdparty/libprocess/src/io.cpp               | 118 ++++--------
 3 files changed, 169 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/include/process/loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/loop.hpp b/3rdparty/libprocess/include/process/loop.hpp
index a78ea7d..ac54b63 100644
--- a/3rdparty/libprocess/include/process/loop.hpp
+++ b/3rdparty/libprocess/include/process/loop.hpp
@@ -13,6 +13,8 @@
 #ifndef __PROCESS_LOOP_HPP__
 #define __PROCESS_LOOP_HPP__
 
+#include <mutex>
+
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
@@ -29,11 +31,17 @@ namespace process {
 // (i.e., a compiler that can't do sufficient tail call optimization
 // may add stack frames for each recursive call).
 //
-// The loop abstraction takes a PID `pid` and uses it as the execution
-// context to run the loop. The implementation does a `defer` on this
-// `pid` to "pop" the stack when it needs to asynchronously
-// recurse. This also lets callers synchronize execution with other
-// code dispatching and deferring using `pid`.
+// The loop abstraction takes an optional PID `pid` and uses it as the
+// execution context to run the loop. The implementation does a
+// `defer` on this `pid` to "pop" the stack when it needs to
+// asynchronously recurse. This also lets callers synchronize
+// execution with other code dispatching and deferring using `pid`. If
+// `None` is passed for `pid` then no `defer` is done and the stack
+// will still "pop" but be restarted from the execution context
+// wherever the blocked future is completed. This is usually very safe
+// when that blocked future will be completed by the IO thread, but
+// should not be used if it's completed by another process (because
+// you'll block that process until the next time the loop blocks).
 //
 // The two functions passed to the loop represent the loop "iterate"
 // step and the loop "body" step respectively. Each invocation of
@@ -87,13 +95,8 @@ namespace process {
 //          [](T t) {
 //            return body(t);
 //          });
-//
-// TODO(benh): Provide an implementation that doesn't require a `pid`
-// for situations like `io::read` and `io::write` where for
-// performance reasons it could make more sense to NOT defer but
-// rather just let the I/O thread handle the execution.
 template <typename Iterate, typename Body>
-Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body);
+Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
 
 
 // A helper for `loop` which creates a Process for us to provide an
@@ -120,10 +123,10 @@ template <typename Iterate, typename Body, typename T>
 class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T>>
 {
 public:
-  Loop(const UPID& pid, const Iterate& iterate, const Body& body)
+  Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
     : pid(pid), iterate(iterate), body(body) {}
 
-  Loop(const UPID& pid, Iterate&& iterate, Body&& body)
+  Loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
     : pid(pid), iterate(std::move(iterate)), body(std::move(body)) {}
 
   std::shared_ptr<Loop> shared()
@@ -142,113 +145,157 @@ public:
     auto self = shared();
     auto weak_self = weak();
 
-    // Make sure we propagate discarding. Note that to avoid an
-    // infinite memory bloat we explicitly don't add a new `onDiscard`
-    // callback for every new future that gets created from invoking
-    // `iterate()` or `body()` but instead discard those futures
-    // explicitly with our single callback here.
-    promise.future()
-      .onDiscard(defer(pid, [weak_self, this]() {
-        auto self = weak_self.lock();
-        if (self) {
-          // NOTE: There's no race here between setting `next` or
-          // `condition` and calling `discard()` on those futures
-          // because we're serializing execution via `defer` on
-          // `pid`. An alternative would require something like
-          // `atomic_shared_ptr` or a mutex.
-          next.discard();
-          condition.discard();
+    // Propagating discards:
+    //
+    // When the caller does a discard we need to propagate it to
+    // either the future returned from `iterate` or the future
+    // returned from `body`. One easy way to do this would be to add
+    // an `onAny` callback for every future returned from `iterate`
+    // and `body`, but that would be a slow memory leak that would
+    // grow over time, especially if the loop was actually
+    // infinite. Instead, we capture the current future that needs to
+    // be discarded within a `discard` function that we'll invoke when
+    // we get a discard. Because there is a race setting the `discard`
+    // function and reading it out to invoke we have to synchronize
+    // access using a mutex. An alternative strategy would be to use
+    // something like `atomic_load` and `atomic_store` with
+    // `shared_ptr` so that we can swap the current future(s)
+    // atomically.
+    promise.future().onDiscard([weak_self, this]() {
+      auto self = weak_self.lock();
+      if (self) {
+        // We need to make a copy of the current `discard` function so
+        // that we can invoke it outside of the `synchronized` block
+        // in the event that discarding invokes causes the `onAny`
+        // callbacks that we have added in `run` to execute which may
+        // deadlock attempting to re-acquire `mutex`!
+        std::function<void()> f = []() {};
+        synchronized (mutex) {
+          f = discard;
         }
-      }));
-
-    // Start the loop using `pid` as the execution context.
-    dispatch(pid, [self, this]() {
-      next = discard_if_necessary<T>(iterate());
-      run();
+        f();
+      }
     });
 
-    return promise.future();
-  }
-
-  // Helper for discarding a future if our promise already has a
-  // discard. We need to check this for every future that gets
-  // returned from `iterate` and `body` because there is a race
-  // between our discard callback (that was set up in `start`) from
-  // being executed and us replacing that future on the next call to
-  // `iterate` and `body`. Note that we explicitly don't stop the loop
-  // if our promise has a discard but rather we just propagate the
-  // discard on to any futures returned from `iterate` and `body`. In
-  // the event of synchronous `iterate` or `body` functions this could
-  // result in an infinite loop.
-  template <typename U>
-  Future<U> discard_if_necessary(Future<U> future) const
-  {
-    if (promise.future().hasDiscard()) {
-      future.discard();
+    if (pid.isSome()) {
+      // Start the loop using `pid` as the execution context.
+      dispatch(pid.get(), [self, this]() {
+        run(iterate());
+      });
+    } else {
+      run(iterate());
     }
-    return future;
+
+    return promise.future();
   }
 
-  void run()
+  void run(Future<T> next)
   {
     auto self = shared();
 
+    // Reset `discard` so that we're not delaying cleanup of any
+    // captured futures longer than necessary.
+    //
+    // TODO(benh): Use `WeakFuture` in `discard` functions instead.
+    discard = []() {};
+
     while (next.isReady()) {
-      condition = discard_if_necessary<bool>(body(next.get()));
+      Future<bool> condition = body(next.get());
       if (condition.isReady()) {
         if (condition.get()) {
-          next = discard_if_necessary<T>(iterate());
+          next = iterate();
           continue;
         } else {
           promise.set(Nothing());
           return;
         }
       } else {
-        condition
-          .onAny(defer(pid, [self, this](const Future<bool>&) {
-            if (condition.isReady()) {
-              if (condition.get()) {
-                next = discard_if_necessary<T>(iterate());
-                run();
-              } else {
-                promise.set(Nothing());
-              }
-            } else if (condition.isFailed()) {
-              promise.fail(condition.failure());
-            } else if (condition.isDiscarded()) {
-              promise.discard();
+        auto continuation = [self, this](const Future<bool>& condition) {
+          if (condition.isReady()) {
+            if (condition.get()) {
+              run(iterate());
+            } else {
+              promise.set(Nothing());
             }
-          }));
+          } else if (condition.isFailed()) {
+            promise.fail(condition.failure());
+          } else if (condition.isDiscarded()) {
+            promise.discard();
+          }
+        };
+
+        if (pid.isSome()) {
+          condition.onAny(defer(pid.get(), continuation));
+        } else {
+          condition.onAny(continuation);
+        }
+
+        if (!promise.future().hasDiscard()) {
+          synchronized (mutex) {
+            discard = [=]() mutable { condition.discard(); };
+          }
+        }
+
+        // There's a race between when a discard occurs and the
+        // `discard` function gets invoked and therefore we must
+        // explicitly always do a discard. In addition, after a
+        // discard occurs we'll need to explicitly do discards for
+        // each new future that blocks.
+        if (promise.future().hasDiscard()) {
+          condition.discard();
+        }
+
         return;
       }
     }
 
-    next
-      .onAny(defer(pid, [self, this](const Future<T>&) {
-        if (next.isReady()) {
-          run();
-        } else if (next.isFailed()) {
-          promise.fail(next.failure());
-        } else if (next.isDiscarded()) {
-          promise.discard();
-        }
-      }));
+    auto continuation = [self, this](const Future<T>& next) {
+      if (next.isReady()) {
+        run(next);
+      } else if (next.isFailed()) {
+        promise.fail(next.failure());
+      } else if (next.isDiscarded()) {
+        promise.discard();
+      }
+    };
+
+    if (pid.isSome()) {
+      next.onAny(defer(pid.get(), continuation));
+    } else {
+      next.onAny(continuation);
+    }
+
+    if (!promise.future().hasDiscard()) {
+      synchronized (mutex) {
+        discard = [=]() mutable { next.discard(); };
+      }
+    }
+
+    // See comment above as to why we need to explicitly discard
+    // regardless of the path the if statement took above.
+    if (promise.future().hasDiscard()) {
+      next.discard();
+    }
   }
 
 private:
-  const UPID pid;
+  const Option<UPID> pid;
   Iterate iterate;
   Body body;
   Promise<Nothing> promise;
-  Future<T> next;
-  Future<bool> condition;
+
+  // In order to discard the loop safely we capture the future that
+  // needs to be discarded within the `discard` function and reading
+  // and writing that function with a mutex.
+  std::mutex mutex;
+  std::function<void()> discard = []() {};
 };
 
 } // namespace internal {
 
 
 template <typename Iterate, typename Body>
-Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body)
+Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
 {
   using T =
     typename internal::unwrap<typename result_of<Iterate()>::type>::type;

http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index dc0070c..97d1424 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1790,7 +1790,8 @@ Future<Nothing> serve(
     .onAny([=]() mutable {
       // Delete remaining requests and discard remaining responses.
       if (pipeline.size() != 0) {
-        loop([=]() mutable {
+        loop(None(),
+             [=]() mutable {
                return pipeline.get();
              },
              [=](Option<Item> item) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index e81f279..27da897 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -17,6 +17,7 @@
 
 #include <process/future.hpp>
 #include <process/io.hpp>
+#include <process/loop.hpp>
 #include <process/process.hpp> // For process::initialize.
 
 #include <stout/lambda.hpp>
@@ -316,76 +317,6 @@ Future<string> _read(
 }
 
 
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index)
-{
-  return io::write(fd, data->data() + index, data->size() - index)
-    .then([=](size_t length) -> Future<Nothing> {
-      if (index + length == data->size()) {
-        return Nothing();
-      }
-      return _write(fd, data, index + length);
-    });
-}
-
-
-void _splice(
-    int from,
-    int to,
-    size_t chunk,
-    const vector<lambda::function<void(const string&)>>& hooks,
-    boost::shared_array<char> data,
-    std::shared_ptr<Promise<Nothing>> promise)
-{
-  // Stop splicing if a discard occurred on our future.
-  if (promise->future().hasDiscard()) {
-    // TODO(benh): Consider returning the number of bytes already
-    // spliced on discarded, or a failure. Same for the 'onDiscarded'
-    // callbacks below.
-    promise->discard();
-    return;
-  }
-
-  // Note that only one of io::read or io::write is outstanding at any
-  // one point in time thus the reuse of 'data' for both operations.
-
-  Future<size_t> read = io::read(from, data.get(), chunk);
-
-  // Stop reading (or potentially indefinitely polling) if a discard
-  // occcurs on our future.
-  promise->future().onDiscard(
-      lambda::bind(&process::internal::discard<size_t>,
-                   WeakFuture<size_t>(read)));
-
-  read
-    .onReady([=](size_t size) {
-      if (size == 0) { // EOF.
-        promise->set(Nothing());
-      } else {
-        // Send the data to the redirect hooks.
-        foreach (
-            const lambda::function<void(const string&)>& hook,
-            hooks) {
-          hook(string(data.get(), size));
-        }
-
-        // Note that we always try and complete the write, even if a
-        // discard has occurred on our future, in order to provide
-        // semantics where everything read is written. The promise
-        // will eventually be discarded in the next read.
-        io::write(to, string(data.get(), size))
-          .onReady([=]() { _splice(from, to, chunk, hooks, data, promise); })
-          .onFailed([=](const string& message) { promise->fail(message); })
-          .onDiscarded([=]() { promise->discard(); });
-      }
-    })
-    .onFailed([=](const string& message) { promise->fail(message); })
-    .onDiscarded([=]() { promise->discard(); });
-}
-
-
 Future<Nothing> splice(
     int from,
     int to,
@@ -393,20 +324,30 @@ Future<Nothing> splice(
     const vector<lambda::function<void(const string&)>>& hooks)
 {
   boost::shared_array<char> data(new char[chunk]);
+  return loop(
+      None(),
+      [=]() {
+        return io::read(from, data.get(), chunk);
+      },
+      [=](size_t length) -> Future<bool> {
+        if (length == 0) { // EOF.
+          return false;
+        }
 
-  // Rather than having internal::_splice return a future and
-  // implementing internal::_splice as a chain of io::read and
-  // io::write calls, we use an explicit promise that we pass around
-  // so that we don't increase memory usage the longer that we splice.
-  std::shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
-
-  Future<Nothing> future = promise->future();
-
-  _splice(from, to, chunk, hooks, data, promise);
+        // Send the data to the redirect hooks.
+        const string s = string(data.get(), length);
+        foreach (const lambda::function<void(const string&)>& hook, hooks) {
+          hook(s);
+        }
 
-  return future;
+        return io::write(to, s)
+          .then([]() {
+            return true;
+          });
+      });
 }
 
+
 } // namespace internal {
 
 
@@ -503,9 +444,20 @@ Future<Nothing> write(int fd, const string& data)
         nonblock.error());
   }
 
-  // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
-  return internal::_write(fd, Owned<string>(new string(data)), 0)
-    .onAny([fd]() { os::close(fd); });
+  const size_t size = data.size();
+  std::shared_ptr<size_t> index(new size_t(0));
+
+  return loop(
+      None(),
+      [=]() {
+        return io::write(fd, data.data() + *index, size - *index);
+      },
+      [=](size_t length) {
+        return (*index += length) != size;
+      })
+    .onAny([fd]() {
+        os::close(fd);
+    });
 }
 
 


[2/2] mesos git commit: Used process::loop in infinitely recursive functions.

Posted by be...@apache.org.
Used process::loop in infinitely recursive functions.

Review: https://reviews.apache.org/r/54751


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3a65509
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3a65509
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3a65509

Branch: refs/heads/master
Commit: a3a65509acebefa285d09079d39a0ebf7b5f086b
Parents: fe6c3e4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 14 08:32:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:45:22 2016 -0800

----------------------------------------------------------------------
 src/common/recordio.hpp | 45 ++++++++++++++++++++++++--------------------
 src/slave/http.cpp      | 34 ++++++++++++++++-----------------
 2 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 0f6b47b..5a22d06 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -26,6 +26,7 @@
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/http.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
@@ -113,26 +114,30 @@ process::Future<Nothing> transform(
     const std::function<std::string(const T&)>& func,
     process::http::Pipe::Writer writer)
 {
-  return reader->read()
-    .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
-      // This could happen if EOF is sent by the writer.
-      if (record.isNone()) {
-        return Nothing();
-      }
-
-      // This could happen if there is a de-serialization error.
-      if (record.isError()) {
-        return process::Failure(record.error());
-      }
-
-      // TODO(vinod): Instead of detecting that the reader went away only
-      // after attempting a write, leverage `writer.readerClosed` future.
-      if (!writer.write(func(record.get()))) {
-        return process::Failure("Write failed to the pipe");
-      }
-
-      return transform(std::move(reader), func, writer);
-  });
+  return process::loop(
+      None(),
+      [=]() {
+        return reader->read();
+      },
+      [=](const Result<T>& record) mutable -> process::Future<bool> {
+        // This could happen if EOF is sent by the writer.
+        if (record.isNone()) {
+          return false;
+        }
+
+        // This could happen if there is a de-serialization error.
+        if (record.isError()) {
+          return process::Failure(record.error());
+        }
+
+        // TODO(vinod): Instead of detecting that the reader went away only
+        // after attempting a write, leverage `writer.readerClosed` future.
+        if (!writer.write(func(record.get()))) {
+          return process::Failure("Write failed to the pipe");
+        }
+
+        return true;
+      });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4cd352f..ecec24a 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -42,6 +42,7 @@
 #include <process/http.hpp>
 #include <process/limiter.hpp>
 #include <process/logging.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 
 #include <process/metrics/metrics.hpp>
@@ -85,6 +86,7 @@ using process::Failure;
 using process::Future;
 using process::HELP;
 using process::Logging;
+using process::loop;
 using process::Owned;
 using process::TLDR;
 
@@ -2582,25 +2584,23 @@ Future<Response> Slave::Http::attachContainerInput(
 // TODO(vinod): Move this to libprocess if this is more generally useful.
 Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
 {
-  return reader.read()
-    .then([reader, writer](const Future<string>& chunk) mutable
-        -> Future<Nothing> {
-      if (!chunk.isReady()) {
-        return process::Failure(
-            chunk.isFailed() ? chunk.failure() : "discarded");
-      }
-
-      if (chunk->empty()) {
-        // EOF case.
-        return Nothing();
-      }
+  return loop(
+      None(),
+      [=]() mutable {
+        return reader.read();
+      },
+      [=](const string& chunk) mutable -> Future<bool> {
+        if (chunk.empty()) {
+          // EOF case.
+          return false;
+        }
 
-      if (!writer.write(chunk.get())) {
-        return process::Failure("Write failed to the pipe");
-      }
+        if (!writer.write(chunk)) {
+          return Failure("Write failed to the pipe");
+        }
 
-      return connect(reader, writer);
-    });
+        return true;
+      });
 }