You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/09/27 01:01:41 UTC

[1/3] mesos git commit: Added MESOS-7964 to 1.4.1 CHANGELOG.

Repository: mesos
Updated Branches:
  refs/heads/1.4.x f82cd43dc -> 8613a7a8b


Added MESOS-7964 to 1.4.1 CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8613a7a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8613a7a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8613a7a8

Branch: refs/heads/1.4.x
Commit: 8613a7a8b953a722f19fc59e07dc327c58033406
Parents: 27b8356
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Tue Sep 26 17:25:45 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700

----------------------------------------------------------------------
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8613a7a8/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 16b1db2..00a8dcc 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -6,6 +6,7 @@ All Resolved Issues:
 
 **Bug
  * [MESOS-7873] - Expose `ExecutorInfo.ContainerInfo.NetworkInfo` in Mesos `state` endpoint.
+ * [MESOS-7964] - Heavy-duty GC makes the agent unresponsive.
  * [MESOS-7968] - Handle `/proc/self/ns/pid_for_children` when parsing available namespace.
  * [MESOS-7969] - Handle cgroups v2 hierarchy when parsing /proc/self/cgroups.
  * [MESOS-7980] - Stout fails to compile with libc >= 2.26.


[2/3] mesos git commit: Prevent GC path removals from blocking other processes.

Posted by ya...@apache.org.
Prevent GC path removals from blocking other processes.

This patch dispatches all path removals to a single executor instead of
one `AsyncExecutor` per path such that heavy-duty removals won't occupy
all worker threads and block other actors.

Review: https://reviews.apache.org/r/62230/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/27b83565
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/27b83565
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/27b83565

Branch: refs/heads/1.4.x
Commit: 27b83565082720cbc9c93b3b892305b899af84b7
Parents: bf82953
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Sep 26 17:07:11 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700

----------------------------------------------------------------------
 src/slave/gc.cpp | 6 ++++--
 src/slave/gc.hpp | 4 ++++
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/27b83565/src/slave/gc.cpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.cpp b/src/slave/gc.cpp
index f270fa4..86d94c8 100644
--- a/src/slave/gc.cpp
+++ b/src/slave/gc.cpp
@@ -16,7 +16,6 @@
 
 #include <list>
 
-#include <process/async.hpp>
 #include <process/check.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -183,7 +182,10 @@ void GarbageCollectorProcess::remove(const Timeout& removalTime)
       return Nothing();
     };
 
-    async(rmdirs)
+    // NOTE: All `rmdirs` calls are dispatched to one executor so that:
+    //   1. They do not block other dispatches (MESOS-6549).
+    //   2. They do not occupy all worker threads (MESOS-7964).
+    executor.execute(rmdirs)
       .onAny(defer(self(), &Self::_remove, lambda::_1, infos));
   } else {
     // This occurs when either:

http://git-wip-us.apache.org/repos/asf/mesos/blob/27b83565/src/slave/gc.hpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.hpp b/src/slave/gc.hpp
index b8d5301..f23ffd3 100644
--- a/src/slave/gc.hpp
+++ b/src/slave/gc.hpp
@@ -21,6 +21,7 @@
 #include <string>
 #include <vector>
 
+#include <process/executor.hpp>
 #include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/owned.hpp>
@@ -140,6 +141,9 @@ private:
   hashmap<std::string, process::Timeout> timeouts;
 
   process::Timer timer;
+
+  // For executing path removals in a separate actor.
+  process::Executor executor;
 };
 
 } // namespace slave {


[3/3] mesos git commit: Added `process::Executor::execute()`.

Posted by ya...@apache.org.
Added `process::Executor::execute()`.

This patch adds a convenient interface to `process::Executor` to
asynchronously execute arbitrary functions.

Review: https://reviews.apache.org/r/62252/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf82953f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf82953f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf82953f

Branch: refs/heads/1.4.x
Commit: bf82953f1ede7ddf182f9cad79a3248ef2630dc8
Parents: f82cd43
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Sep 25 14:10:27 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Sep 26 17:30:12 2017 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/executor.hpp     | 46 ++++++++++-
 3rdparty/libprocess/src/tests/process_tests.cpp | 85 +++++++++++++++++---
 2 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/include/process/executor.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/executor.hpp b/3rdparty/libprocess/include/process/executor.hpp
index cd2f2f0..95a9e69 100644
--- a/3rdparty/libprocess/include/process/executor.hpp
+++ b/3rdparty/libprocess/include/process/executor.hpp
@@ -17,12 +17,15 @@
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/nothing.hpp>
+#include <stout/result_of.hpp>
+
 namespace process {
 
 // Provides an abstraction that can take a standard function object
-// and defer it without needing a process. Each converted function
-// object will get execute serially with respect to one another when
-// invoked.
+// and defer or asynchronously execute it without needing a process.
+// Each converted function object will get execute serially with respect
+// to one another when invoked.
 class Executor
 {
 public:
@@ -39,7 +42,7 @@ public:
 
   void stop()
   {
-    terminate(&process);
+    terminate(process);
 
     // TODO(benh): Note that this doesn't wait because that could
     // cause a deadlock ... thus, the semantics here are that no more
@@ -60,6 +63,41 @@ private:
   Executor& operator=(const Executor&);
 
   ProcessBase process;
+
+
+public:
+  template <
+      typename F,
+      typename R = typename result_of<F()>::type,
+      typename std::enable_if<!std::is_void<R>::value, int>::type = 0>
+  auto execute(F&& f)
+    -> decltype(dispatch(process, std::function<R()>(std::forward<F>(f))))
+  {
+    // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+    // because it would be captured by copy, so we convert `f` into a
+    // `std::function` to bypass this restriction.
+    return dispatch(process, std::function<R()>(std::forward<F>(f)));
+  }
+
+  // NOTE: This overload for `void` returns `Future<Nothing>` so we can
+  // chain. This follows the same behavior as `async()`.
+  template <
+      typename F,
+      typename R = typename result_of<F()>::type,
+      typename std::enable_if<std::is_void<R>::value, int>::type = 0>
+  Future<Nothing> execute(F&& f)
+  {
+    // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
+    // because it would be captured by copy, so we convert `f` into a
+    // `std::function` to bypass this restriction. This wrapper also
+    // avoids `f` being evaluated when it is a nested bind.
+    // TODO(chhsiao): Capture `f` by forwarding once we switch to C++14.
+    return dispatch(
+        process,
+        std::bind(
+            [](const std::function<R()>& f_) { f_(); return Nothing(); },
+            std::function<R()>(std::forward<F>(f))));
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82953f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 82efb2f..e494318 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/async.hpp>
 #include <process/clock.hpp>
+#include <process/count_down_latch.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
@@ -70,6 +71,7 @@ namespace inet4 = process::network::inet4;
 
 using process::async;
 using process::Clock;
+using process::CountDownLatch;
 using process::defer;
 using process::Deferred;
 using process::Event;
@@ -102,6 +104,7 @@ using std::vector;
 using testing::_;
 using testing::Assign;
 using testing::DoAll;
+using testing::InvokeWithoutArgs;
 using testing::Return;
 using testing::ReturnArg;
 
@@ -1201,20 +1204,17 @@ public:
 };
 
 
-TEST(ProcessTest, THREADSAFE_Executor)
+TEST(ProcessTest, THREADSAFE_Executor_Defer)
 {
-  std::atomic_bool event1Called(false);
-  std::atomic_bool event2Called(false);
-
   EventReceiver receiver;
+  Executor executor;
 
-  EXPECT_CALL(receiver, event1(42))
-    .WillOnce(Assign(&event1Called, true));
-
-  EXPECT_CALL(receiver, event2("event2"))
-    .WillOnce(Assign(&event2Called, true));
+  CountDownLatch event1Called;
 
-  Executor executor;
+  EXPECT_CALL(receiver, event1(42))
+    .WillOnce(InvokeWithoutArgs([&]() {
+      event1Called.decrement();
+    }));
 
   Deferred<void(int)> event1 =
     executor.defer([&receiver](int i) {
@@ -1223,6 +1223,15 @@ TEST(ProcessTest, THREADSAFE_Executor)
 
   event1(42);
 
+  AWAIT_READY(event1Called.triggered());
+
+  CountDownLatch event2Called;
+
+  EXPECT_CALL(receiver, event2("event2"))
+    .WillOnce(InvokeWithoutArgs([&]() {
+      event2Called.decrement();
+    }));
+
   Deferred<void(const string&)> event2 =
     executor.defer([&receiver](const string& s) {
       return receiver.event2(s);
@@ -1230,8 +1239,60 @@ TEST(ProcessTest, THREADSAFE_Executor)
 
   event2("event2");
 
-  while (event1Called.load() == false);
-  while (event2Called.load() == false);
+  AWAIT_READY(event2Called.triggered());
+}
+
+
+TEST(ProcessTest, THREADSAFE_Executor_Execute)
+{
+  Executor executor;
+
+  // A void immutable lambda.
+  CountDownLatch f1Result;
+  auto f1 = [&f1Result] {
+    f1Result.decrement();
+  };
+
+  // Specify the return type explicitly for type checking. Same below.
+  Future<Nothing> f1Called = executor.execute(f1);
+
+  AWAIT_READY(f1Called);
+  AWAIT_READY(f1Result.triggered());
+
+  // A void mutable bind.
+  CountDownLatch f2Result;
+  int f2State = 0;
+  auto f2 = [&f2Result, f2State](int) mutable -> void {
+    f2State++;
+    f2Result.decrement();
+  };
+
+  Future<Nothing> f2Called = executor.execute(std::bind(f2, 42));
+
+  AWAIT_READY(f2Called);
+  AWAIT_READY(f2Result.triggered());
+
+  // A non-void immutable lambda.
+  const string f3Result = "f3";
+  auto f3 = [&f3Result] {
+    return f3Result;
+  };
+
+  Future<string> f3Called = executor.execute(f3);
+
+  AWAIT_EXPECT_EQ(f3Result, f3Called);
+
+  // A mutable bind returning a future.
+  const string f4Result = "f4";
+  int f4State = 0;
+  auto f4 = [&f4Result, f4State](int) mutable -> Future<string> {
+    f4State++;
+    return f4Result;
+  };
+
+  Future<string> f4Called = executor.execute(std::bind(f4, 42));
+
+  AWAIT_EXPECT_EQ(f4Result, f4Called);
 }