You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/08/15 22:49:08 UTC

[mesos] 01/07: Updated libprocess filtering to take the Process UPID.

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d0df437ccd57c7b9934da5d753f783e2b7c6158c
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Aug 13 14:42:46 2018 -0700

    Updated libprocess filtering to take the Process UPID.
    
    When `EXPECT_DISPATCH` was originally introduced in de8e10830d178a7c,
    the pid was added to `DispatchEvent` rather than being passed through
    the filter API. This adds a performance overhead (we must perform an
    unnecessary `UPID` copy construction / destruction for every
    dispatch), and it also requires that we inject the `UPID` into
    additional events (for example, if we wanted to introduce
    `FUTURE_EXITED`, we need to add the UPID into `ExitedEvent`).
    
    Rather than continue this trend, we can instead pass the Process
    `UPID` through the filter API so that the client knows which Process
    the event belongs to. This also lets us remove the `UPID` copy
    construction / destruction cost from `DispatchEvent`, which should
    provide a minor performance benefit.
    
    Review: https://reviews.apache.org/r/68325
---
 3rdparty/libprocess/include/process/event.hpp  |   7 +-
 3rdparty/libprocess/include/process/filter.hpp |  42 +++++---
 3rdparty/libprocess/include/process/gmock.hpp  | 141 +++++++++++++++----------
 3rdparty/libprocess/src/process.cpp            |   4 +-
 4 files changed, 118 insertions(+), 76 deletions(-)

diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index d96db74..6d7f812 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -178,11 +178,9 @@ struct HttpEvent : Event
 struct DispatchEvent : Event
 {
   DispatchEvent(
-      const UPID& _pid,
       std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> _f,
       const Option<const std::type_info*>& _functionType)
-    : pid(_pid),
-      f(std::move(_f)),
+    : f(std::move(_f)),
       functionType(_functionType)
   {}
 
@@ -201,9 +199,6 @@ struct DispatchEvent : Event
     consumer->consume(std::move(*this));
   }
 
-  // PID receiving the dispatch.
-  UPID pid;
-
   // Function to get invoked as a result of this dispatch event.
   std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f;
 
diff --git a/3rdparty/libprocess/include/process/filter.hpp b/3rdparty/libprocess/include/process/filter.hpp
index 79a1917..d637903 100644
--- a/3rdparty/libprocess/include/process/filter.hpp
+++ b/3rdparty/libprocess/include/process/filter.hpp
@@ -20,42 +20,60 @@ namespace process {
 class Filter {
 public:
   virtual ~Filter() {}
-  virtual bool filter(const MessageEvent&) { return false; }
-  virtual bool filter(const DispatchEvent&) { return false; }
-  virtual bool filter(const HttpEvent&) { return false; }
-  virtual bool filter(const ExitedEvent&) { return false; }
+  virtual bool filter(const UPID& process, const MessageEvent&)
+  {
+    return false;
+  }
+  virtual bool filter(const UPID& process, const DispatchEvent&)
+  {
+    return false;
+  }
+  virtual bool filter(const UPID& process, const HttpEvent&)
+  {
+    return false;
+  }
+  virtual bool filter(const UPID& process, const ExitedEvent&)
+  {
+    return false;
+  }
 
-  virtual bool filter(Event* event)
+  virtual bool filter(const UPID& process, Event* event)
   {
     bool result = false;
     struct FilterVisitor : EventVisitor
     {
-      explicit FilterVisitor(Filter* _filter, bool* _result)
-        : filter(_filter), result(_result) {}
+      explicit FilterVisitor(
+          Filter* _filter,
+          const UPID& _process,
+          bool* _result)
+        : filter(_filter),
+          process(_process),
+          result(_result) {}
 
       void visit(const MessageEvent& event) override
       {
-        *result = filter->filter(event);
+        *result = filter->filter(process, event);
       }
 
       void visit(const DispatchEvent& event) override
       {
-        *result = filter->filter(event);
+        *result = filter->filter(process, event);
       }
 
       void visit(const HttpEvent& event) override
       {
-        *result = filter->filter(event);
+        *result = filter->filter(process, event);
       }
 
       void visit(const ExitedEvent& event) override
       {
-        *result = filter->filter(event);
+        *result = filter->filter(process, event);
       }
 
       Filter* filter;
+      const UPID& process;
       bool* result;
-    } visitor(this, &result);
+    } visitor(this, process, &result);
 
     event->visit(&visitor);
 
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e30f586..61b102e 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -233,20 +233,28 @@ class MockFilter : public Filter
 public:
   MockFilter()
   {
-    EXPECT_CALL(*this, filter(testing::A<const MessageEvent&>()))
+    EXPECT_CALL(*this, filter(
+        testing::A<const UPID&>(),
+        testing::A<const MessageEvent&>()))
       .WillRepeatedly(testing::Return(false));
-    EXPECT_CALL(*this, filter(testing::A<const DispatchEvent&>()))
+    EXPECT_CALL(*this, filter(
+        testing::A<const UPID&>(),
+        testing::A<const DispatchEvent&>()))
       .WillRepeatedly(testing::Return(false));
-    EXPECT_CALL(*this, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(*this, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .WillRepeatedly(testing::Return(false));
-    EXPECT_CALL(*this, filter(testing::A<const ExitedEvent&>()))
+    EXPECT_CALL(*this, filter(
+        testing::A<const UPID&>(),
+        testing::A<const ExitedEvent&>()))
       .WillRepeatedly(testing::Return(false));
   }
 
-  MOCK_METHOD1(filter, bool(const MessageEvent&));
-  MOCK_METHOD1(filter, bool(const DispatchEvent&));
-  MOCK_METHOD1(filter, bool(const HttpEvent&));
-  MOCK_METHOD1(filter, bool(const ExitedEvent&));
+  MOCK_METHOD2(filter, bool(const UPID& process, const MessageEvent&));
+  MOCK_METHOD2(filter, bool(const UPID& process, const DispatchEvent&));
+  MOCK_METHOD2(filter, bool(const UPID& process, const HttpEvent&));
+  MOCK_METHOD2(filter, bool(const UPID& process, const ExitedEvent&));
 };
 
 
@@ -259,16 +267,28 @@ class TestsFilter : public Filter
 public:
   TestsFilter() = default;
 
-  bool filter(const MessageEvent& event) override { return handle(event); }
-  bool filter(const DispatchEvent& event) override { return handle(event); }
-  bool filter(const HttpEvent& event) override { return handle(event); }
-  bool filter(const ExitedEvent& event) override { return handle(event); }
+  bool filter(const UPID& process, const MessageEvent& event) override
+  {
+    return handle(process, event);
+  }
+  bool filter(const UPID& process, const DispatchEvent& event) override
+  {
+    return handle(process, event);
+  }
+  bool filter(const UPID& process, const HttpEvent& event) override
+  {
+    return handle(process, event);
+  }
+  bool filter(const UPID& process, const ExitedEvent& event) override
+  {
+    return handle(process, event);
+  }
 
   template <typename T>
-  bool handle(const T& t)
+  bool handle(const UPID& process, const T& t)
   {
     synchronized (mutex) {
-      return mock.filter(t);
+      return mock.filter(process, t);
     }
   }
 
@@ -343,44 +363,41 @@ private:
 };
 
 
-MATCHER_P3(MessageMatcher, name, from, to, "")
+MATCHER_P2(MessageMatcher, name, from, "")
 {
-  const MessageEvent& event = ::std::get<0>(arg);
+  const MessageEvent& event = ::std::get<1>(arg);
   return (testing::Matcher<std::string>(name).Matches(event.message.name) &&
-          testing::Matcher<UPID>(from).Matches(event.message.from) &&
-          testing::Matcher<UPID>(to).Matches(event.message.to));
+          testing::Matcher<UPID>(from).Matches(event.message.from));
 }
 
 
 // This matches protobuf messages that are described using the
 // standard protocol buffer "union" trick, see:
 // https://developers.google.com/protocol-buffers/docs/techniques#union.
-MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
+MATCHER_P3(UnionMessageMatcher, message, unionType, from, "")
 {
-  const process::MessageEvent& event = ::std::get<0>(arg);
+  const process::MessageEvent& event = ::std::get<1>(arg);
   message_type message;
 
   return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
               event.message.name) &&
           message.ParseFromString(event.message.body) &&
           testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
-          testing::Matcher<process::UPID>(from).Matches(event.message.from) &&
-          testing::Matcher<process::UPID>(to).Matches(event.message.to));
+          testing::Matcher<process::UPID>(from).Matches(event.message.from));
 }
 
 
-MATCHER_P2(DispatchMatcher, pid, method, "")
+MATCHER_P(DispatchMatcher, method, "")
 {
-  const DispatchEvent& event = ::std::get<0>(arg);
-  return (testing::Matcher<UPID>(pid).Matches(event.pid) &&
-          event.functionType.isSome() &&
+  const DispatchEvent& event = ::std::get<1>(arg);
+  return (event.functionType.isSome() &&
           *event.functionType.get() == typeid(method));
 }
 
 
 MATCHER_P3(HttpMatcher, message, path, deserializer, "")
 {
-  const HttpEvent& event = ::std::get<0>(arg);
+  const HttpEvent& event = ::std::get<1>(arg);
 
   Try<message_type> message_ = deserializer(event.request->body);
   if (message_.isError()) {
@@ -395,7 +412,7 @@ MATCHER_P3(HttpMatcher, message, path, deserializer, "")
 // "union" trick.
 MATCHER_P4(UnionHttpMatcher, message, unionType, path, deserializer, "")
 {
-  const HttpEvent& event = ::std::get<0>(arg);
+  const HttpEvent& event = ::std::get<1>(arg);
 
   Try<message_type> message_ = deserializer(event.request->body);
   if (message_.isError()) {
@@ -418,9 +435,11 @@ Future<http::Request> FutureHttpRequest(
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<http::Request> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(HttpMatcher(message, path, deserializer))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgField<1>(
                                    &HttpEvent::request,
                                    &future),
                                testing::Return(drop)))
@@ -445,9 +464,11 @@ Future<http::Request> FutureUnionHttpRequest(
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<http::Request> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(UnionHttpMatcher(message, unionType, path, deserializer))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgField<1>(
                                    &HttpEvent::request,
                                    &future),
                                testing::Return(drop)))
@@ -464,9 +485,9 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<Message> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(MessageMatcher(name, from, to))
-      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from))
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<1>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))
@@ -486,9 +507,9 @@ Future<process::Message> FutureUnionMessage(
 
   Future<process::Message> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(UnionMessageMatcher(message, unionType, from, to))
-      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from))
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<1>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))
@@ -505,8 +526,8 @@ Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<Nothing> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
-      .With(DispatchMatcher(pid, method))
+    EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+      .With(DispatchMatcher(method))
       .WillOnce(testing::DoAll(FutureSatisfy(&future),
                               testing::Return(drop)))
       .RetiresOnSaturation(); // Don't impose any subsequent expectations.
@@ -521,8 +542,8 @@ void DropMessages(Name name, From from, To to)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(MessageMatcher(name, from, to))
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from))
       .WillRepeatedly(testing::Return(true));
   }
 }
@@ -533,8 +554,8 @@ void DropUnionMessages(Message message, UnionType unionType, From from, To to)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(UnionMessageMatcher(message, unionType, from, to))
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from))
       .WillRepeatedly(testing::Return(true));
   }
 }
@@ -549,7 +570,9 @@ void DropHttpRequests(
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(HttpMatcher(message, path, deserializer))
       .WillRepeatedly(testing::Return(true));
   }
@@ -570,7 +593,9 @@ void DropUnionHttpRequests(
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<http::Request> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(UnionHttpMatcher(message, unionType, path, deserializer))
       .WillRepeatedly(testing::Return(true));
   }
@@ -586,7 +611,9 @@ void ExpectNoFutureHttpRequests(
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(HttpMatcher(message, path, deserializer))
       .Times(0);
   }
@@ -607,7 +634,9 @@ void ExpectNoFutureUnionHttpRequests(
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   Future<http::Request> future;
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+    EXPECT_CALL(filter->mock, filter(
+        testing::A<const UPID&>(),
+        testing::A<const HttpEvent&>()))
       .With(UnionHttpMatcher(message, unionType, path, deserializer))
       .Times(0);
   }
@@ -619,8 +648,8 @@ void ExpectNoFutureMessages(Name name, From from, To to)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(MessageMatcher(name, from, to))
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from))
       .Times(0);
   }
 }
@@ -632,8 +661,8 @@ void ExpectNoFutureUnionMessages(
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-      .With(UnionMessageMatcher(message, unionType, from, to))
+    EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from))
       .Times(0);
   }
 }
@@ -644,8 +673,8 @@ void DropDispatches(PID pid, Method method)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
-      .With(DispatchMatcher(pid, method))
+    EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+      .With(DispatchMatcher(method))
       .WillRepeatedly(testing::Return(true));
   }
 }
@@ -656,8 +685,8 @@ void ExpectNoFutureDispatches(PID pid, Method method)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
   synchronized (filter->mutex) {
-    EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
-      .With(DispatchMatcher(pid, method))
+    EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+      .With(DispatchMatcher(method))
       .Times(0);
   }
 }
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 23f7ce8..fe89f55 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2970,7 +2970,7 @@ void ProcessManager::resume(ProcessBase* process)
       if (filter.load() != nullptr) {
         synchronized (filter_mutex) {
           Filter* f = filter.load();
-          if (f != nullptr && f->filter(event)) {
+          if (f != nullptr && f->filter(process->self(), event)) {
             delete event;
             continue; // Try and execute the next event.
           }
@@ -4024,7 +4024,7 @@ void dispatch(
 {
   process::initialize();
 
-  DispatchEvent* event = new DispatchEvent(pid, std::move(f), functionType);
+  DispatchEvent* event = new DispatchEvent(std::move(f), functionType);
   process_manager->deliver(pid, event, __process__);
 }