You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/09/27 01:01:41 UTC
[1/3] mesos git commit: Added MESOS-7964 to 1.4.1 CHANGELOG.
Repository: mesos
Updated Branches:
refs/heads/1.4.x f82cd43dc -> 8613a7a8b
Added MESOS-7964 to 1.4.1 CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8613a7a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8613a7a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8613a7a8
Branch: refs/heads/1.4.x
Commit: 8613a7a8b953a722f19fc59e07dc327c58033406
Parents: 27b8356
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Tue Sep 26 17:25:45 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700
----------------------------------------------------------------------
CHANGELOG | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8613a7a8/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 16b1db2..00a8dcc 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -6,6 +6,7 @@ All Resolved Issues:
**Bug
* [MESOS-7873] - Expose `ExecutorInfo.ContainerInfo.NetworkInfo` in Mesos `state` endpoint.
+ * [MESOS-7964] - Heavy-duty GC makes the agent unresponsive.
* [MESOS-7968] - Handle `/proc/self/ns/pid_for_children` when parsing available namespace.
* [MESOS-7969] - Handle cgroups v2 hierarchy when parsing /proc/self/cgroups.
* [MESOS-7980] - Stout fails to compile with libc >= 2.26.
[2/3] mesos git commit: Prevent GC path removals from blocking other
processes.
Posted by ya...@apache.org.
Prevent GC path removals from blocking other processes.
This patch dispatches all path removals to a single executor instead of
one `AsyncExecutor` per path such that heavy-duty removals won't occupy
all worker threads and block other actors.
Review: https://reviews.apache.org/r/62230/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/27b83565
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/27b83565
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/27b83565
Branch: refs/heads/1.4.x
Commit: 27b83565082720cbc9c93b3b892305b899af84b7
Parents: bf82953
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Sep 26 17:07:11 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700
----------------------------------------------------------------------
src/slave/gc.cpp | 6 ++++--
src/slave/gc.hpp | 4 ++++
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/27b83565/src/slave/gc.cpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.cpp b/src/slave/gc.cpp
index f270fa4..86d94c8 100644
--- a/src/slave/gc.cpp
+++ b/src/slave/gc.cpp
@@ -16,7 +16,6 @@
#include <list>
-#include <process/async.hpp>
#include <process/check.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -183,7 +182,10 @@ void GarbageCollectorProcess::remove(const Timeout& removalTime)
return Nothing();
};
- async(rmdirs)
+ // NOTE: All `rmdirs` calls are dispatched to one executor so that:
+ // 1. They do not block other dispatches (MESOS-6549).
+ // 2. They do not occupy all worker threads (MESOS-7964).
+ executor.execute(rmdirs)
.onAny(defer(self(), &Self::_remove, lambda::_1, infos));
} else {
// This occurs when either:
http://git-wip-us.apache.org/repos/asf/mesos/blob/27b83565/src/slave/gc.hpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.hpp b/src/slave/gc.hpp
index b8d5301..f23ffd3 100644
--- a/src/slave/gc.hpp
+++ b/src/slave/gc.hpp
@@ -21,6 +21,7 @@
#include <string>
#include <vector>
+#include <process/executor.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
@@ -140,6 +141,9 @@ private:
hashmap<std::string, process::Timeout> timeouts;
process::Timer timer;
+
+ // For executing path removals in a separate actor.
+ process::Executor executor;
};
} // namespace slave {
[3/3] mesos git commit: Added `process::Executor::execute()`.
Posted by ya...@apache.org.
Added `process::Executor::execute()`.
This patch adds a convenient interface to `process::Executor` to
asynchronously execute arbitrary functions.
Review: https://reviews.apache.org/r/62252/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf82953f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf82953f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf82953f
Branch: refs/heads/1.4.x
Commit: bf82953f1ede7ddf182f9cad79a3248ef2630dc8
Parents: f82cd43
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Sep 25 14:10:27 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700
----------------------------------------------------------------------
.../libprocess/include/process/executor.hpp | 46 ++++++++++-
3rdparty/libprocess/src/tests/process_tests.cpp | 85 +++++++++++++++++---
2 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/include/process/executor.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/executor.hpp b/3rdparty/libprocess/include/process/executor.hpp
index cd2f2f0..95a9e69 100644
--- a/3rdparty/libprocess/include/process/executor.hpp
+++ b/3rdparty/libprocess/include/process/executor.hpp
@@ -17,12 +17,15 @@
#include <process/id.hpp>
#include <process/process.hpp>
+#include <stout/nothing.hpp>
+#include <stout/result_of.hpp>
+
namespace process {
// Provides an abstraction that can take a standard function object
-// and defer it without needing a process. Each converted function
-// object will get execute serially with respect to one another when
-// invoked.
+// and defer or asynchronously execute it without needing a process.
+// Each converted function object will get execute serially with respect
+// to one another when invoked.
class Executor
{
public:
@@ -39,7 +42,7 @@ public:
void stop()
{
- terminate(&process);
+ terminate(process);
// TODO(benh): Note that this doesn't wait because that could
// cause a deadlock ... thus, the semantics here are that no more
@@ -60,6 +63,41 @@ private:
Executor& operator=(const Executor&);
ProcessBase process;
+
+
+public:
+ template <
+ typename F,
+ typename R = typename result_of<F()>::type,
+ typename std::enable_if<!std::is_void<R>::value, int>::type = 0>
+ auto execute(F&& f)
+ -> decltype(dispatch(process, std::function<R()>(std::forward<F>(f))))
+ {
+ // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+ // because it would be captured by copy, so we convert `f` into a
+ // `std::function` to bypass this restriction.
+ return dispatch(process, std::function<R()>(std::forward<F>(f)));
+ }
+
+ // NOTE: This overload for `void` returns `Future<Nothing>` so we can
+ // chain. This follows the same behavior as `async()`.
+ template <
+ typename F,
+ typename R = typename result_of<F()>::type,
+ typename std::enable_if<std::is_void<R>::value, int>::type = 0>
+ Future<Nothing> execute(F&& f)
+ {
+ // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+ // because it would be captured by copy, so we convert `f` into a
+ // `std::function` to bypass this restriction. This wrapper also
+ // avoids `f` being evaluated when it is a nested bind.
+ // TODO(chhsiao): Capture `f` by forwarding once we switch to C++14.
+ return dispatch(
+ process,
+ std::bind(
+ [](const std::function<R()>& f_) { f_(); return Nothing(); },
+ std::function<R()>(std::forward<F>(f))));
+ }
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 82efb2f..e494318 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -31,6 +31,7 @@
#include <process/async.hpp>
#include <process/clock.hpp>
+#include <process/count_down_latch.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
@@ -70,6 +71,7 @@ namespace inet4 = process::network::inet4;
using process::async;
using process::Clock;
+using process::CountDownLatch;
using process::defer;
using process::Deferred;
using process::Event;
@@ -102,6 +104,7 @@ using std::vector;
using testing::_;
using testing::Assign;
using testing::DoAll;
+using testing::InvokeWithoutArgs;
using testing::Return;
using testing::ReturnArg;
@@ -1201,20 +1204,17 @@ public:
};
-TEST(ProcessTest, THREADSAFE_Executor)
+TEST(ProcessTest, THREADSAFE_Executor_Defer)
{
- std::atomic_bool event1Called(false);
- std::atomic_bool event2Called(false);
-
EventReceiver receiver;
+ Executor executor;
- EXPECT_CALL(receiver, event1(42))
- .WillOnce(Assign(&event1Called, true));
-
- EXPECT_CALL(receiver, event2("event2"))
- .WillOnce(Assign(&event2Called, true));
+ CountDownLatch event1Called;
- Executor executor;
+ EXPECT_CALL(receiver, event1(42))
+ .WillOnce(InvokeWithoutArgs([&]() {
+ event1Called.decrement();
+ }));
Deferred<void(int)> event1 =
executor.defer([&receiver](int i) {
@@ -1223,6 +1223,15 @@ TEST(ProcessTest, THREADSAFE_Executor)
event1(42);
+ AWAIT_READY(event1Called.triggered());
+
+ CountDownLatch event2Called;
+
+ EXPECT_CALL(receiver, event2("event2"))
+ .WillOnce(InvokeWithoutArgs([&]() {
+ event2Called.decrement();
+ }));
+
Deferred<void(const string&)> event2 =
executor.defer([&receiver](const string& s) {
return receiver.event2(s);
@@ -1230,8 +1239,60 @@ TEST(ProcessTest, THREADSAFE_Executor)
event2("event2");
- while (event1Called.load() == false);
- while (event2Called.load() == false);
+ AWAIT_READY(event2Called.triggered());
+}
+
+
+TEST(ProcessTest, THREADSAFE_Executor_Execute)
+{
+ Executor executor;
+
+ // A void immutable lambda.
+ CountDownLatch f1Result;
+ auto f1 = [&f1Result] {
+ f1Result.decrement();
+ };
+
+ // Specify the return type explicitly for type checking. Same below.
+ Future<Nothing> f1Called = executor.execute(f1);
+
+ AWAIT_READY(f1Called);
+ AWAIT_READY(f1Result.triggered());
+
+ // A void mutable bind.
+ CountDownLatch f2Result;
+ int f2State = 0;
+ auto f2 = [&f2Result, f2State](int) mutable -> void {
+ f2State++;
+ f2Result.decrement();
+ };
+
+ Future<Nothing> f2Called = executor.execute(std::bind(f2, 42));
+
+ AWAIT_READY(f2Called);
+ AWAIT_READY(f2Result.triggered());
+
+ // A non-void immutable lambda.
+ const string f3Result = "f3";
+ auto f3 = [&f3Result] {
+ return f3Result;
+ };
+
+ Future<string> f3Called = executor.execute(f3);
+
+ AWAIT_EXPECT_EQ(f3Result, f3Called);
+
+ // A mutable bind returning a future.
+ const string f4Result = "f4";
+ int f4State = 0;
+ auto f4 = [&f4Result, f4State](int) mutable -> Future<string> {
+ f4State++;
+ return f4Result;
+ };
+
+ Future<string> f4Called = executor.execute(std::bind(f4, 42));
+
+ AWAIT_EXPECT_EQ(f4Result, f4Called);
}