You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/19 20:21:26 UTC

[7/7] mesos git commit: Added double-checked locking for filter.

Added double-checked locking for filter.

When running in"production" a filter will not be (likely) be set yet
with the current code the threads will experience head-of-line
blocking becaues they'll all queue up on the mutex. Test code will
still queue up but we don't care about the performance in these
situations!

Note that this patch also moves the filter into ProcessManager in the
effort towards eliminating globals.

Review: https://reviews.apache.org/r/60871


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8f42d0c1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8f42d0c1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8f42d0c1

Branch: refs/heads/master
Commit: 8f42d0c112748e138cc607b945455045fdac4b20
Parents: 102e869
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 14 09:16:03 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:20:21 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/filter.hpp | 37 ++++++++++
 3rdparty/libprocess/src/process.cpp            | 81 ++++++++++-----------
 2 files changed, 75 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8f42d0c1/3rdparty/libprocess/include/process/filter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/filter.hpp b/3rdparty/libprocess/include/process/filter.hpp
index cafa7be..3f4a827 100644
--- a/3rdparty/libprocess/include/process/filter.hpp
+++ b/3rdparty/libprocess/include/process/filter.hpp
@@ -24,6 +24,43 @@ public:
   virtual bool filter(const DispatchEvent&) { return false; }
   virtual bool filter(const HttpEvent&) { return false; }
   virtual bool filter(const ExitedEvent&) { return false; }
+
+  virtual bool filter(Event* event)
+  {
+    bool result = false;
+    struct FilterVisitor : EventVisitor
+    {
+      explicit FilterVisitor(Filter* _filter, bool* _result)
+        : filter(_filter), result(_result) {}
+
+      virtual void visit(const MessageEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const DispatchEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const HttpEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const ExitedEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      Filter* filter;
+      bool* result;
+    } visitor(this, &result);
+
+    event->visit(&visitor);
+
+    return result;
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8f42d0c1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 4ddeba3..b268cda 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -561,6 +561,18 @@ public:
   // The /__processes__ route.
   Future<Response> __processes__(const Request&);
 
+  void install(Filter* f)
+  {
+    // NOTE: even though `filter` is atomic we still need to
+    // synchronize updating it because once we return from this
+    // function the old filter might get deleted which could be bad if
+    // a thread is currently using the old filter in
+    // `ProcessManager::resume`.
+    synchronized (filter_mutex) {
+      filter.store(f);
+    }
+  }
+
 private:
   // Delegate process name to receive root HTTP requests.
   const Option<string> delegate;
@@ -594,6 +606,13 @@ private:
   // Whether the process manager is finalizing or not.
   // If true, no further processes will be spawned.
   std::atomic_bool finalizing;
+
+  // Filter. Synchronized support for using the filter needs to be
+  // recursive in case a filter wants to do anything fancy (which is
+  // possible and likely given that filters will get used for
+  // testing).
+  std::atomic<Filter*> filter = ATOMIC_VAR_INIT(nullptr);
+  std::recursive_mutex filter_mutex;
 };
 
 
@@ -640,12 +659,6 @@ static AuthorizationCallbacks* authorization_callbacks = nullptr;
 // Global route that returns process information.
 static Route* processes_route = nullptr;
 
-// Filter. Synchronized support for using the filterer needs to be
-// recursive in case a filterer wants to do anything fancy (which is
-// possible and likely given that filters will get used for testing).
-static Filter* filterer = nullptr;
-static std::recursive_mutex* filterer_mutex = new std::recursive_mutex();
-
 // Global garbage collector.
 GarbageCollector* gc = nullptr;
 
@@ -3198,39 +3211,23 @@ void ProcessManager::resume(ProcessBase* process)
       CHECK(event != nullptr);
 
       // Determine if we should filter this event.
-      synchronized (filterer_mutex) {
-        if (filterer != nullptr) {
-          bool filter = false;
-          struct FilterVisitor : EventVisitor
-          {
-            explicit FilterVisitor(bool* _filter) : filter(_filter) {}
-
-            virtual void visit(const MessageEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const DispatchEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const HttpEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const ExitedEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            bool* filter;
-          } visitor(&filter);
-
-          event->visit(&visitor);
-
-          if (filter) {
+      //
+      // NOTE: we use double-checked locking here to avoid
+      // head-of-line blocking that occurs when the first thread
+      // attempts to filter an event.
+      //
+      // TODO(benh): While not critical for production systems because
+      // the filter should not be set in production systems, we could
+      // use a reader/writer lock here in addition to double-checked
+      // locking.
+      //
+      // TODO(benh): Consider optimizing this further to not be
+      // sequentially consistent. For more details see:
+      // http://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11.
+      if (filter.load() != nullptr) {
+        synchronized (filter_mutex) {
+          Filter* f = filter.load();
+          if (f != nullptr && f->filter(event)) {
             delete event;
             continue; // Try and execute the next event.
           }
@@ -4180,13 +4177,11 @@ bool wait(const UPID& pid, const Duration& duration)
 }
 
 
-void filter(Filter *filter)
+void filter(Filter* filter)
 {
   process::initialize();
 
-  synchronized (filterer_mutex) {
-    filterer = filter;
-  }
+  process_manager->install(filter);
 }