You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/08/15 22:49:08 UTC
[mesos] 01/07: Updated libprocess filtering to take the Process
UPID.
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d0df437ccd57c7b9934da5d753f783e2b7c6158c
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Aug 13 14:42:46 2018 -0700
Updated libprocess filtering to take the Process UPID.
When `EXPECT_DISPATCH` was originally introduced in de8e10830d178a7c,
the pid was added to `DispatchEvent` rather than being passed through
the filter API. This adds a performance overhead (we must perform an
unnecessary `UPID` copy construction / destruction for every
dispatch), and it also requires that we inject the `UPID` into
additional events (for example, if we wanted to introduce
`FUTURE_EXITED`, we need to add the UPID into `ExitedEvent`).
Rather than continue this trend, we can instead pass the Process
`UPID` through the filter API so that the client knows which Process
the event belongs to. This also lets us remove the `UPID` copy
construction / destruction cost from `DispatchEvent`, which should
provide a minor performance benefit.
Review: https://reviews.apache.org/r/68325
---
3rdparty/libprocess/include/process/event.hpp | 7 +-
3rdparty/libprocess/include/process/filter.hpp | 42 +++++---
3rdparty/libprocess/include/process/gmock.hpp | 141 +++++++++++++++----------
3rdparty/libprocess/src/process.cpp | 4 +-
4 files changed, 118 insertions(+), 76 deletions(-)
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index d96db74..6d7f812 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -178,11 +178,9 @@ struct HttpEvent : Event
struct DispatchEvent : Event
{
DispatchEvent(
- const UPID& _pid,
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> _f,
const Option<const std::type_info*>& _functionType)
- : pid(_pid),
- f(std::move(_f)),
+ : f(std::move(_f)),
functionType(_functionType)
{}
@@ -201,9 +199,6 @@ struct DispatchEvent : Event
consumer->consume(std::move(*this));
}
- // PID receiving the dispatch.
- UPID pid;
-
// Function to get invoked as a result of this dispatch event.
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f;
diff --git a/3rdparty/libprocess/include/process/filter.hpp b/3rdparty/libprocess/include/process/filter.hpp
index 79a1917..d637903 100644
--- a/3rdparty/libprocess/include/process/filter.hpp
+++ b/3rdparty/libprocess/include/process/filter.hpp
@@ -20,42 +20,60 @@ namespace process {
class Filter {
public:
virtual ~Filter() {}
- virtual bool filter(const MessageEvent&) { return false; }
- virtual bool filter(const DispatchEvent&) { return false; }
- virtual bool filter(const HttpEvent&) { return false; }
- virtual bool filter(const ExitedEvent&) { return false; }
+ virtual bool filter(const UPID& process, const MessageEvent&)
+ {
+ return false;
+ }
+ virtual bool filter(const UPID& process, const DispatchEvent&)
+ {
+ return false;
+ }
+ virtual bool filter(const UPID& process, const HttpEvent&)
+ {
+ return false;
+ }
+ virtual bool filter(const UPID& process, const ExitedEvent&)
+ {
+ return false;
+ }
- virtual bool filter(Event* event)
+ virtual bool filter(const UPID& process, Event* event)
{
bool result = false;
struct FilterVisitor : EventVisitor
{
- explicit FilterVisitor(Filter* _filter, bool* _result)
- : filter(_filter), result(_result) {}
+ explicit FilterVisitor(
+ Filter* _filter,
+ const UPID& _process,
+ bool* _result)
+ : filter(_filter),
+ process(_process),
+ result(_result) {}
void visit(const MessageEvent& event) override
{
- *result = filter->filter(event);
+ *result = filter->filter(process, event);
}
void visit(const DispatchEvent& event) override
{
- *result = filter->filter(event);
+ *result = filter->filter(process, event);
}
void visit(const HttpEvent& event) override
{
- *result = filter->filter(event);
+ *result = filter->filter(process, event);
}
void visit(const ExitedEvent& event) override
{
- *result = filter->filter(event);
+ *result = filter->filter(process, event);
}
Filter* filter;
+ const UPID& process;
bool* result;
- } visitor(this, &result);
+ } visitor(this, process, &result);
event->visit(&visitor);
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e30f586..61b102e 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -233,20 +233,28 @@ class MockFilter : public Filter
public:
MockFilter()
{
- EXPECT_CALL(*this, filter(testing::A<const MessageEvent&>()))
+ EXPECT_CALL(*this, filter(
+ testing::A<const UPID&>(),
+ testing::A<const MessageEvent&>()))
.WillRepeatedly(testing::Return(false));
- EXPECT_CALL(*this, filter(testing::A<const DispatchEvent&>()))
+ EXPECT_CALL(*this, filter(
+ testing::A<const UPID&>(),
+ testing::A<const DispatchEvent&>()))
.WillRepeatedly(testing::Return(false));
- EXPECT_CALL(*this, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(*this, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.WillRepeatedly(testing::Return(false));
- EXPECT_CALL(*this, filter(testing::A<const ExitedEvent&>()))
+ EXPECT_CALL(*this, filter(
+ testing::A<const UPID&>(),
+ testing::A<const ExitedEvent&>()))
.WillRepeatedly(testing::Return(false));
}
- MOCK_METHOD1(filter, bool(const MessageEvent&));
- MOCK_METHOD1(filter, bool(const DispatchEvent&));
- MOCK_METHOD1(filter, bool(const HttpEvent&));
- MOCK_METHOD1(filter, bool(const ExitedEvent&));
+ MOCK_METHOD2(filter, bool(const UPID& process, const MessageEvent&));
+ MOCK_METHOD2(filter, bool(const UPID& process, const DispatchEvent&));
+ MOCK_METHOD2(filter, bool(const UPID& process, const HttpEvent&));
+ MOCK_METHOD2(filter, bool(const UPID& process, const ExitedEvent&));
};
@@ -259,16 +267,28 @@ class TestsFilter : public Filter
public:
TestsFilter() = default;
- bool filter(const MessageEvent& event) override { return handle(event); }
- bool filter(const DispatchEvent& event) override { return handle(event); }
- bool filter(const HttpEvent& event) override { return handle(event); }
- bool filter(const ExitedEvent& event) override { return handle(event); }
+ bool filter(const UPID& process, const MessageEvent& event) override
+ {
+ return handle(process, event);
+ }
+ bool filter(const UPID& process, const DispatchEvent& event) override
+ {
+ return handle(process, event);
+ }
+ bool filter(const UPID& process, const HttpEvent& event) override
+ {
+ return handle(process, event);
+ }
+ bool filter(const UPID& process, const ExitedEvent& event) override
+ {
+ return handle(process, event);
+ }
template <typename T>
- bool handle(const T& t)
+ bool handle(const UPID& process, const T& t)
{
synchronized (mutex) {
- return mock.filter(t);
+ return mock.filter(process, t);
}
}
@@ -343,44 +363,41 @@ private:
};
-MATCHER_P3(MessageMatcher, name, from, to, "")
+MATCHER_P2(MessageMatcher, name, from, "")
{
- const MessageEvent& event = ::std::get<0>(arg);
+ const MessageEvent& event = ::std::get<1>(arg);
return (testing::Matcher<std::string>(name).Matches(event.message.name) &&
- testing::Matcher<UPID>(from).Matches(event.message.from) &&
- testing::Matcher<UPID>(to).Matches(event.message.to));
+ testing::Matcher<UPID>(from).Matches(event.message.from));
}
// This matches protobuf messages that are described using the
// standard protocol buffer "union" trick, see:
// https://developers.google.com/protocol-buffers/docs/techniques#union.
-MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
+MATCHER_P3(UnionMessageMatcher, message, unionType, from, "")
{
- const process::MessageEvent& event = ::std::get<0>(arg);
+ const process::MessageEvent& event = ::std::get<1>(arg);
message_type message;
return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
event.message.name) &&
message.ParseFromString(event.message.body) &&
testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
- testing::Matcher<process::UPID>(from).Matches(event.message.from) &&
- testing::Matcher<process::UPID>(to).Matches(event.message.to));
+ testing::Matcher<process::UPID>(from).Matches(event.message.from));
}
-MATCHER_P2(DispatchMatcher, pid, method, "")
+MATCHER_P(DispatchMatcher, method, "")
{
- const DispatchEvent& event = ::std::get<0>(arg);
- return (testing::Matcher<UPID>(pid).Matches(event.pid) &&
- event.functionType.isSome() &&
+ const DispatchEvent& event = ::std::get<1>(arg);
+ return (event.functionType.isSome() &&
*event.functionType.get() == typeid(method));
}
MATCHER_P3(HttpMatcher, message, path, deserializer, "")
{
- const HttpEvent& event = ::std::get<0>(arg);
+ const HttpEvent& event = ::std::get<1>(arg);
Try<message_type> message_ = deserializer(event.request->body);
if (message_.isError()) {
@@ -395,7 +412,7 @@ MATCHER_P3(HttpMatcher, message, path, deserializer, "")
// "union" trick.
MATCHER_P4(UnionHttpMatcher, message, unionType, path, deserializer, "")
{
- const HttpEvent& event = ::std::get<0>(arg);
+ const HttpEvent& event = ::std::get<1>(arg);
Try<message_type> message_ = deserializer(event.request->body);
if (message_.isError()) {
@@ -418,9 +435,11 @@ Future<http::Request> FutureHttpRequest(
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<http::Request> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(HttpMatcher(message, path, deserializer))
- .WillOnce(testing::DoAll(FutureArgField<0>(
+ .WillOnce(testing::DoAll(FutureArgField<1>(
&HttpEvent::request,
&future),
testing::Return(drop)))
@@ -445,9 +464,11 @@ Future<http::Request> FutureUnionHttpRequest(
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<http::Request> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(UnionHttpMatcher(message, unionType, path, deserializer))
- .WillOnce(testing::DoAll(FutureArgField<0>(
+ .WillOnce(testing::DoAll(FutureArgField<1>(
&HttpEvent::request,
&future),
testing::Return(drop)))
@@ -464,9 +485,9 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<Message> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
- .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from))
+ .WillOnce(testing::DoAll(FutureArgNotPointerField<1>(
&MessageEvent::message,
&future),
testing::Return(drop)))
@@ -486,9 +507,9 @@ Future<process::Message> FutureUnionMessage(
Future<process::Message> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(UnionMessageMatcher(message, unionType, from, to))
- .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from))
+ .WillOnce(testing::DoAll(FutureArgNotPointerField<1>(
&MessageEvent::message,
&future),
testing::Return(drop)))
@@ -505,8 +526,8 @@ Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<Nothing> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
- .With(DispatchMatcher(pid, method))
+ EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(method))
.WillOnce(testing::DoAll(FutureSatisfy(&future),
testing::Return(drop)))
.RetiresOnSaturation(); // Don't impose any subsequent expectations.
@@ -521,8 +542,8 @@ void DropMessages(Name name, From from, To to)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from))
.WillRepeatedly(testing::Return(true));
}
}
@@ -533,8 +554,8 @@ void DropUnionMessages(Message message, UnionType unionType, From from, To to)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(UnionMessageMatcher(message, unionType, from, to))
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from))
.WillRepeatedly(testing::Return(true));
}
}
@@ -549,7 +570,9 @@ void DropHttpRequests(
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(HttpMatcher(message, path, deserializer))
.WillRepeatedly(testing::Return(true));
}
@@ -570,7 +593,9 @@ void DropUnionHttpRequests(
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<http::Request> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(UnionHttpMatcher(message, unionType, path, deserializer))
.WillRepeatedly(testing::Return(true));
}
@@ -586,7 +611,9 @@ void ExpectNoFutureHttpRequests(
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(HttpMatcher(message, path, deserializer))
.Times(0);
}
@@ -607,7 +634,9 @@ void ExpectNoFutureUnionHttpRequests(
TestsFilter* filter = FilterTestEventListener::instance()->install();
Future<http::Request> future;
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const HttpEvent&>()))
+ EXPECT_CALL(filter->mock, filter(
+ testing::A<const UPID&>(),
+ testing::A<const HttpEvent&>()))
.With(UnionHttpMatcher(message, unionType, path, deserializer))
.Times(0);
}
@@ -619,8 +648,8 @@ void ExpectNoFutureMessages(Name name, From from, To to)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from))
.Times(0);
}
}
@@ -632,8 +661,8 @@ void ExpectNoFutureUnionMessages(
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(UnionMessageMatcher(message, unionType, from, to))
+ EXPECT_CALL(filter->mock, filter(to, testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from))
.Times(0);
}
}
@@ -644,8 +673,8 @@ void DropDispatches(PID pid, Method method)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
- .With(DispatchMatcher(pid, method))
+ EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(method))
.WillRepeatedly(testing::Return(true));
}
}
@@ -656,8 +685,8 @@ void ExpectNoFutureDispatches(PID pid, Method method)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
synchronized (filter->mutex) {
- EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
- .With(DispatchMatcher(pid, method))
+ EXPECT_CALL(filter->mock, filter(pid, testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(method))
.Times(0);
}
}
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 23f7ce8..fe89f55 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2970,7 +2970,7 @@ void ProcessManager::resume(ProcessBase* process)
if (filter.load() != nullptr) {
synchronized (filter_mutex) {
Filter* f = filter.load();
- if (f != nullptr && f->filter(event)) {
+ if (f != nullptr && f->filter(process->self(), event)) {
delete event;
continue; // Try and execute the next event.
}
@@ -4024,7 +4024,7 @@ void dispatch(
{
process::initialize();
- DispatchEvent* event = new DispatchEvent(pid, std::move(f), functionType);
+ DispatchEvent* event = new DispatchEvent(std::move(f), functionType);
process_manager->deliver(pid, event, __process__);
}