You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/12/14 16:50:22 UTC
[1/2] mesos git commit: Added a synchronous version of loop for
io::read/write/redirect.
Repository: mesos
Updated Branches:
refs/heads/master 608e2006e -> a3a65509a
Added a synchronous version of loop for io::read/write/redirect.
Review: https://reviews.apache.org/r/54295
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fe6c3e46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fe6c3e46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fe6c3e46
Branch: refs/heads/master
Commit: fe6c3e46926d9f13252b2b4d30825e125449c747
Parents: 608e200
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 1 22:45:01 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:44:38 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/loop.hpp | 217 +++++++++++++---------
3rdparty/libprocess/src/http.cpp | 3 +-
3rdparty/libprocess/src/io.cpp | 118 ++++--------
3 files changed, 169 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/include/process/loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/loop.hpp b/3rdparty/libprocess/include/process/loop.hpp
index a78ea7d..ac54b63 100644
--- a/3rdparty/libprocess/include/process/loop.hpp
+++ b/3rdparty/libprocess/include/process/loop.hpp
@@ -13,6 +13,8 @@
#ifndef __PROCESS_LOOP_HPP__
#define __PROCESS_LOOP_HPP__
+#include <mutex>
+
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
@@ -29,11 +31,17 @@ namespace process {
// (i.e., a compiler that can't do sufficient tail call optimization
// may add stack frames for each recursive call).
//
-// The loop abstraction takes a PID `pid` and uses it as the execution
-// context to run the loop. The implementation does a `defer` on this
-// `pid` to "pop" the stack when it needs to asynchronously
-// recurse. This also lets callers synchronize execution with other
-// code dispatching and deferring using `pid`.
+// The loop abstraction takes an optional PID `pid` and uses it as the
+// execution context to run the loop. The implementation does a
+// `defer` on this `pid` to "pop" the stack when it needs to
+// asynchronously recurse. This also lets callers synchronize
+// execution with other code dispatching and deferring using `pid`. If
+// `None` is passed for `pid` then no `defer` is done and the stack
+// will still "pop" but be restarted from the execution context
+// wherever the blocked future is completed. This is usually very safe
+// when that blocked future will be completed by the IO thread, but
+// should not be used if it's completed by another process (because
+// you'll block that process until the next time the loop blocks).
//
// The two functions passed to the loop represent the loop "iterate"
// step and the loop "body" step respectively. Each invocation of
@@ -87,13 +95,8 @@ namespace process {
// [](T t) {
// return body(t);
// });
-//
-// TODO(benh): Provide an implementation that doesn't require a `pid`
-// for situations like `io::read` and `io::write` where for
-// performance reasons it could make more sense to NOT defer but
-// rather just let the I/O thread handle the execution.
template <typename Iterate, typename Body>
-Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body);
+Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
// A helper for `loop` which creates a Process for us to provide an
@@ -120,10 +123,10 @@ template <typename Iterate, typename Body, typename T>
class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T>>
{
public:
- Loop(const UPID& pid, const Iterate& iterate, const Body& body)
+ Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
: pid(pid), iterate(iterate), body(body) {}
- Loop(const UPID& pid, Iterate&& iterate, Body&& body)
+ Loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
: pid(pid), iterate(std::move(iterate)), body(std::move(body)) {}
std::shared_ptr<Loop> shared()
@@ -142,113 +145,157 @@ public:
auto self = shared();
auto weak_self = weak();
- // Make sure we propagate discarding. Note that to avoid an
- // infinite memory bloat we explicitly don't add a new `onDiscard`
- // callback for every new future that gets created from invoking
- // `iterate()` or `body()` but instead discard those futures
- // explicitly with our single callback here.
- promise.future()
- .onDiscard(defer(pid, [weak_self, this]() {
- auto self = weak_self.lock();
- if (self) {
- // NOTE: There's no race here between setting `next` or
- // `condition` and calling `discard()` on those futures
- // because we're serializing execution via `defer` on
- // `pid`. An alternative would require something like
- // `atomic_shared_ptr` or a mutex.
- next.discard();
- condition.discard();
+ // Propagating discards:
+ //
+ // When the caller does a discard we need to propagate it to
+ // either the future returned from `iterate` or the future
+ // returned from `body`. One easy way to do this would be to add
+ // an `onAny` callback for every future returned from `iterate`
+ // and `body`, but that would be a slow memory leak that would
+ // grow over time, especially if the loop was actually
+ // infinite. Instead, we capture the current future that needs to
+ // be discarded within a `discard` function that we'll invoke when
+ // we get a discard. Because there is a race setting the `discard`
+ // function and reading it out to invoke we have to synchronize
+ // access using a mutex. An alternative strategy would be to use
+ // something like `atomic_load` and `atomic_store` with
+ // `shared_ptr` so that we can swap the current future(s)
+ // atomically.
+ promise.future().onDiscard([weak_self, this]() {
+ auto self = weak_self.lock();
+ if (self) {
+ // We need to make a copy of the current `discard` function so
+ // that we can invoke it outside of the `synchronized` block
+ // in the event that discarding invokes causes the `onAny`
+ // callbacks that we have added in `run` to execute which may
+ // deadlock attempting to re-acquire `mutex`!
+ std::function<void()> f = []() {};
+ synchronized (mutex) {
+ f = discard;
}
- }));
-
- // Start the loop using `pid` as the execution context.
- dispatch(pid, [self, this]() {
- next = discard_if_necessary<T>(iterate());
- run();
+ f();
+ }
});
- return promise.future();
- }
-
- // Helper for discarding a future if our promise already has a
- // discard. We need to check this for every future that gets
- // returned from `iterate` and `body` because there is a race
- // between our discard callback (that was set up in `start`) from
- // being executed and us replacing that future on the next call to
- // `iterate` and `body`. Note that we explicitly don't stop the loop
- // if our promise has a discard but rather we just propagate the
- // discard on to any futures returned from `iterate` and `body`. In
- // the event of synchronous `iterate` or `body` functions this could
- // result in an infinite loop.
- template <typename U>
- Future<U> discard_if_necessary(Future<U> future) const
- {
- if (promise.future().hasDiscard()) {
- future.discard();
+ if (pid.isSome()) {
+ // Start the loop using `pid` as the execution context.
+ dispatch(pid.get(), [self, this]() {
+ run(iterate());
+ });
+ } else {
+ run(iterate());
}
- return future;
+
+ return promise.future();
}
- void run()
+ void run(Future<T> next)
{
auto self = shared();
+ // Reset `discard` so that we're not delaying cleanup of any
+ // captured futures longer than necessary.
+ //
+ // TODO(benh): Use `WeakFuture` in `discard` functions instead.
+ discard = []() {};
+
while (next.isReady()) {
- condition = discard_if_necessary<bool>(body(next.get()));
+ Future<bool> condition = body(next.get());
if (condition.isReady()) {
if (condition.get()) {
- next = discard_if_necessary<T>(iterate());
+ next = iterate();
continue;
} else {
promise.set(Nothing());
return;
}
} else {
- condition
- .onAny(defer(pid, [self, this](const Future<bool>&) {
- if (condition.isReady()) {
- if (condition.get()) {
- next = discard_if_necessary<T>(iterate());
- run();
- } else {
- promise.set(Nothing());
- }
- } else if (condition.isFailed()) {
- promise.fail(condition.failure());
- } else if (condition.isDiscarded()) {
- promise.discard();
+ auto continuation = [self, this](const Future<bool>& condition) {
+ if (condition.isReady()) {
+ if (condition.get()) {
+ run(iterate());
+ } else {
+ promise.set(Nothing());
}
- }));
+ } else if (condition.isFailed()) {
+ promise.fail(condition.failure());
+ } else if (condition.isDiscarded()) {
+ promise.discard();
+ }
+ };
+
+ if (pid.isSome()) {
+ condition.onAny(defer(pid.get(), continuation));
+ } else {
+ condition.onAny(continuation);
+ }
+
+ if (!promise.future().hasDiscard()) {
+ synchronized (mutex) {
+ discard = [=]() mutable { condition.discard(); };
+ }
+ }
+
+ // There's a race between when a discard occurs and the
+ // `discard` function gets invoked and therefore we must
+ // explicitly always do a discard. In addition, after a
+ // discard occurs we'll need to explicitly do discards for
+ // each new future that blocks.
+ if (promise.future().hasDiscard()) {
+ condition.discard();
+ }
+
return;
}
}
- next
- .onAny(defer(pid, [self, this](const Future<T>&) {
- if (next.isReady()) {
- run();
- } else if (next.isFailed()) {
- promise.fail(next.failure());
- } else if (next.isDiscarded()) {
- promise.discard();
- }
- }));
+ auto continuation = [self, this](const Future<T>& next) {
+ if (next.isReady()) {
+ run(next);
+ } else if (next.isFailed()) {
+ promise.fail(next.failure());
+ } else if (next.isDiscarded()) {
+ promise.discard();
+ }
+ };
+
+ if (pid.isSome()) {
+ next.onAny(defer(pid.get(), continuation));
+ } else {
+ next.onAny(continuation);
+ }
+
+ if (!promise.future().hasDiscard()) {
+ synchronized (mutex) {
+ discard = [=]() mutable { next.discard(); };
+ }
+ }
+
+ // See comment above as to why we need to explicitly discard
+ // regardless of the path the if statement took above.
+ if (promise.future().hasDiscard()) {
+ next.discard();
+ }
}
private:
- const UPID pid;
+ const Option<UPID> pid;
Iterate iterate;
Body body;
Promise<Nothing> promise;
- Future<T> next;
- Future<bool> condition;
+
+ // In order to discard the loop safely we capture the future that
+ // needs to be discarded within the `discard` function and reading
+ // and writing that function with a mutex.
+ std::mutex mutex;
+ std::function<void()> discard = []() {};
};
} // namespace internal {
template <typename Iterate, typename Body>
-Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body)
+Future<Nothing> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
{
using T =
typename internal::unwrap<typename result_of<Iterate()>::type>::type;
http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index dc0070c..97d1424 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1790,7 +1790,8 @@ Future<Nothing> serve(
.onAny([=]() mutable {
// Delete remaining requests and discard remaining responses.
if (pipeline.size() != 0) {
- loop([=]() mutable {
+ loop(None(),
+ [=]() mutable {
return pipeline.get();
},
[=](Option<Item> item) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/fe6c3e46/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index e81f279..27da897 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -17,6 +17,7 @@
#include <process/future.hpp>
#include <process/io.hpp>
+#include <process/loop.hpp>
#include <process/process.hpp> // For process::initialize.
#include <stout/lambda.hpp>
@@ -316,76 +317,6 @@ Future<string> _read(
}
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index)
-{
- return io::write(fd, data->data() + index, data->size() - index)
- .then([=](size_t length) -> Future<Nothing> {
- if (index + length == data->size()) {
- return Nothing();
- }
- return _write(fd, data, index + length);
- });
-}
-
-
-void _splice(
- int from,
- int to,
- size_t chunk,
- const vector<lambda::function<void(const string&)>>& hooks,
- boost::shared_array<char> data,
- std::shared_ptr<Promise<Nothing>> promise)
-{
- // Stop splicing if a discard occurred on our future.
- if (promise->future().hasDiscard()) {
- // TODO(benh): Consider returning the number of bytes already
- // spliced on discarded, or a failure. Same for the 'onDiscarded'
- // callbacks below.
- promise->discard();
- return;
- }
-
- // Note that only one of io::read or io::write is outstanding at any
- // one point in time thus the reuse of 'data' for both operations.
-
- Future<size_t> read = io::read(from, data.get(), chunk);
-
- // Stop reading (or potentially indefinitely polling) if a discard
- // occcurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<size_t>,
- WeakFuture<size_t>(read)));
-
- read
- .onReady([=](size_t size) {
- if (size == 0) { // EOF.
- promise->set(Nothing());
- } else {
- // Send the data to the redirect hooks.
- foreach (
- const lambda::function<void(const string&)>& hook,
- hooks) {
- hook(string(data.get(), size));
- }
-
- // Note that we always try and complete the write, even if a
- // discard has occurred on our future, in order to provide
- // semantics where everything read is written. The promise
- // will eventually be discarded in the next read.
- io::write(to, string(data.get(), size))
- .onReady([=]() { _splice(from, to, chunk, hooks, data, promise); })
- .onFailed([=](const string& message) { promise->fail(message); })
- .onDiscarded([=]() { promise->discard(); });
- }
- })
- .onFailed([=](const string& message) { promise->fail(message); })
- .onDiscarded([=]() { promise->discard(); });
-}
-
-
Future<Nothing> splice(
int from,
int to,
@@ -393,20 +324,30 @@ Future<Nothing> splice(
const vector<lambda::function<void(const string&)>>& hooks)
{
boost::shared_array<char> data(new char[chunk]);
+ return loop(
+ None(),
+ [=]() {
+ return io::read(from, data.get(), chunk);
+ },
+ [=](size_t length) -> Future<bool> {
+ if (length == 0) { // EOF.
+ return false;
+ }
- // Rather than having internal::_splice return a future and
- // implementing internal::_splice as a chain of io::read and
- // io::write calls, we use an explicit promise that we pass around
- // so that we don't increase memory usage the longer that we splice.
- std::shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
-
- Future<Nothing> future = promise->future();
-
- _splice(from, to, chunk, hooks, data, promise);
+ // Send the data to the redirect hooks.
+ const string s = string(data.get(), length);
+ foreach (const lambda::function<void(const string&)>& hook, hooks) {
+ hook(s);
+ }
- return future;
+ return io::write(to, s)
+ .then([]() {
+ return true;
+ });
+ });
}
+
} // namespace internal {
@@ -503,9 +444,20 @@ Future<Nothing> write(int fd, const string& data)
nonblock.error());
}
- // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
- return internal::_write(fd, Owned<string>(new string(data)), 0)
- .onAny([fd]() { os::close(fd); });
+ const size_t size = data.size();
+ std::shared_ptr<size_t> index(new size_t(0));
+
+ return loop(
+ None(),
+ [=]() {
+ return io::write(fd, data.data() + *index, size - *index);
+ },
+ [=](size_t length) {
+ return (*index += length) != size;
+ })
+ .onAny([fd]() {
+ os::close(fd);
+ });
}
[2/2] mesos git commit: Used process::loop in infinitely recursive
functions.
Posted by be...@apache.org.
Used process::loop in infinitely recursive functions.
Review: https://reviews.apache.org/r/54751
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3a65509
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3a65509
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3a65509
Branch: refs/heads/master
Commit: a3a65509acebefa285d09079d39a0ebf7b5f086b
Parents: fe6c3e4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 14 08:32:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:45:22 2016 -0800
----------------------------------------------------------------------
src/common/recordio.hpp | 45 ++++++++++++++++++++++++--------------------
src/slave/http.cpp | 34 ++++++++++++++++-----------------
2 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 0f6b47b..5a22d06 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -26,6 +26,7 @@
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
@@ -113,26 +114,30 @@ process::Future<Nothing> transform(
const std::function<std::string(const T&)>& func,
process::http::Pipe::Writer writer)
{
- return reader->read()
- .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
- // This could happen if EOF is sent by the writer.
- if (record.isNone()) {
- return Nothing();
- }
-
- // This could happen if there is a de-serialization error.
- if (record.isError()) {
- return process::Failure(record.error());
- }
-
- // TODO(vinod): Instead of detecting that the reader went away only
- // after attempting a write, leverage `writer.readerClosed` future.
- if (!writer.write(func(record.get()))) {
- return process::Failure("Write failed to the pipe");
- }
-
- return transform(std::move(reader), func, writer);
- });
+ return process::loop(
+ None(),
+ [=]() {
+ return reader->read();
+ },
+ [=](const Result<T>& record) mutable -> process::Future<bool> {
+ // This could happen if EOF is sent by the writer.
+ if (record.isNone()) {
+ return false;
+ }
+
+ // This could happen if there is a de-serialization error.
+ if (record.isError()) {
+ return process::Failure(record.error());
+ }
+
+ // TODO(vinod): Instead of detecting that the reader went away only
+ // after attempting a write, leverage `writer.readerClosed` future.
+ if (!writer.write(func(record.get()))) {
+ return process::Failure("Write failed to the pipe");
+ }
+
+ return true;
+ });
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4cd352f..ecec24a 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -42,6 +42,7 @@
#include <process/http.hpp>
#include <process/limiter.hpp>
#include <process/logging.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/metrics/metrics.hpp>
@@ -85,6 +86,7 @@ using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
+using process::loop;
using process::Owned;
using process::TLDR;
@@ -2582,25 +2584,23 @@ Future<Response> Slave::Http::attachContainerInput(
// TODO(vinod): Move this to libprocess if this is more generally useful.
Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
{
- return reader.read()
- .then([reader, writer](const Future<string>& chunk) mutable
- -> Future<Nothing> {
- if (!chunk.isReady()) {
- return process::Failure(
- chunk.isFailed() ? chunk.failure() : "discarded");
- }
-
- if (chunk->empty()) {
- // EOF case.
- return Nothing();
- }
+ return loop(
+ None(),
+ [=]() mutable {
+ return reader.read();
+ },
+ [=](const string& chunk) mutable -> Future<bool> {
+ if (chunk.empty()) {
+ // EOF case.
+ return false;
+ }
- if (!writer.write(chunk.get())) {
- return process::Failure("Write failed to the pipe");
- }
+ if (!writer.write(chunk)) {
+ return Failure("Write failed to the pipe");
+ }
- return connect(reader, writer);
- });
+ return true;
+ });
}