You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/19 20:21:23 UTC
[4/7] mesos git commit: Removed extra/unnecessary allocations of
Message.
Removed extra/unnecessary allocations of Message.
Review: https://reviews.apache.org/r/60831
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7413a3c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7413a3c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7413a3c4
Branch: refs/heads/master
Commit: 7413a3c43a82287519712f24151126453f0d82f6
Parents: c1b6d97
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Jul 11 17:38:01 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/event.hpp | 27 ++--
3rdparty/libprocess/include/process/gmock.hpp | 44 +++++--
.../libprocess/include/process/protobuf.hpp | 8 +-
3rdparty/libprocess/src/encoder.hpp | 66 ++++------
3rdparty/libprocess/src/process.cpp | 124 +++++++++----------
3rdparty/libprocess/src/tests/process_tests.cpp | 4 +-
3rdparty/libprocess/src/tests/test_linkee.cpp | 2 +-
7 files changed, 139 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 8afe626..a0ec053 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -86,30 +86,25 @@ struct Event
struct MessageEvent : Event
{
- explicit MessageEvent(Message* _message)
- : message(_message) {}
+ explicit MessageEvent(Message&& _message)
+ : message(std::move(_message)) {}
- MessageEvent(const MessageEvent& that)
- : message(that.message == nullptr ? nullptr : new Message(*that.message)) {}
+ MessageEvent(const MessageEvent& that) = default;
+ MessageEvent(MessageEvent&& that) = default;
- virtual ~MessageEvent()
- {
- delete message;
- }
+ // Keep MessageEvent not assignable even though we made it
+ // copyable.
+ // Note that we are violating the "rule of three" here but it helps
+ // keep the fields const.
+ MessageEvent& operator=(const MessageEvent&) = delete;
+ MessageEvent& operator=(MessageEvent&&) = delete;
virtual void visit(EventVisitor* visitor) const
{
visitor->visit(*this);
}
- Message* const message;
-
-private:
- // Keep MessageEvent not assignable even though we made it
- // copyable.
- // Note that we are violating the "rule of three" here but it helps
- // keep the fields const.
- MessageEvent& operator=(const MessageEvent&);
+ const Message message;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e9af943..231efcd 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -109,6 +109,32 @@ PromiseArgFieldActionP2<index, Field, process::Promise<T>*> FutureArgField(
}
+ACTION_TEMPLATE(PromiseArgNotPointerField,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_2_VALUE_PARAMS(field, promise))
+{
+ // TODO(benh): Use a shared_ptr for promise to defend against this
+ // action getting invoked more than once (e.g., used via
+ // WillRepeatedly). We won't be able to set it a second time but at
+ // least we won't get a segmentation fault. We could also consider
+ // warning users if they attempted to set it more than once.
+ promise->set(std::get<k>(args).*field);
+ delete promise;
+}
+
+
+template <int index, typename Field, typename T>
+PromiseArgNotPointerFieldActionP2<index, Field, process::Promise<T>*>
+FutureArgNotPointerField(
+ Field field,
+ process::Future<T>* future)
+{
+ process::Promise<T>* promise = new process::Promise<T>();
+ *future = promise->future();
+ return PromiseArgNotPointerField<index>(field, promise);
+}
+
+
ACTION_P2(PromiseSatisfy, promise, value)
{
promise->set(value);
@@ -319,9 +345,9 @@ private:
MATCHER_P3(MessageMatcher, name, from, to, "")
{
const MessageEvent& event = ::std::get<0>(arg);
- return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
- testing::Matcher<UPID>(from).Matches(event.message->from) &&
- testing::Matcher<UPID>(to).Matches(event.message->to));
+ return (testing::Matcher<std::string>(name).Matches(event.message.name) &&
+ testing::Matcher<UPID>(from).Matches(event.message.from) &&
+ testing::Matcher<UPID>(to).Matches(event.message.to));
}
@@ -334,11 +360,11 @@ MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
message_type message;
return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
- event.message->name) &&
- message.ParseFromString(event.message->body) &&
+ event.message.name) &&
+ message.ParseFromString(event.message.body) &&
testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
- testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
- testing::Matcher<process::UPID>(to).Matches(event.message->to));
+ testing::Matcher<process::UPID>(from).Matches(event.message.from) &&
+ testing::Matcher<process::UPID>(to).Matches(event.message.to));
}
@@ -440,7 +466,7 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
synchronized (filter->mutex) {
EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
.With(MessageMatcher(name, from, to))
- .WillOnce(testing::DoAll(FutureArgField<0>(
+ .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
&MessageEvent::message,
&future),
testing::Return(drop)))
@@ -462,7 +488,7 @@ Future<process::Message> FutureUnionMessage(
synchronized (filter->mutex) {
EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
.With(UnionMessageMatcher(message, unionType, from, to))
- .WillOnce(testing::DoAll(FutureArgField<0>(
+ .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
&MessageEvent::message,
&future),
testing::Return(drop)))
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/protobuf.hpp b/3rdparty/libprocess/include/process/protobuf.hpp
index ba6e6d6..2b6b623 100644
--- a/3rdparty/libprocess/include/process/protobuf.hpp
+++ b/3rdparty/libprocess/include/process/protobuf.hpp
@@ -99,10 +99,10 @@ public:
protected:
virtual void visit(const process::MessageEvent& event)
{
- if (protobufHandlers.count(event.message->name) > 0) {
- from = event.message->from; // For 'reply'.
- protobufHandlers[event.message->name](
- event.message->from, event.message->body);
+ if (protobufHandlers.count(event.message.name) > 0) {
+ from = event.message.from; // For 'reply'.
+ protobufHandlers[event.message.name](
+ event.message.from, event.message.body);
from = process::UPID();
} else {
process::Process<T>::visit(event);
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 517ec21..70b5ec4 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -101,54 +101,42 @@ private:
class MessageEncoder : public DataEncoder
{
public:
- MessageEncoder(Message* _message)
- : DataEncoder(encode(_message)), message(_message) {}
+ MessageEncoder(const Message& message)
+ : DataEncoder(encode(message)) {}
- virtual ~MessageEncoder()
- {
- if (message != nullptr) {
- delete message;
- }
- }
-
- static std::string encode(Message* message)
+ static std::string encode(const Message& message)
{
std::ostringstream out;
- if (message != nullptr) {
- out << "POST ";
- // Nothing keeps the 'id' component of a PID from being an empty
- // string which would create a malformed path that has two
- // '//' unless we check for it explicitly.
- // TODO(benh): Make the 'id' part of a PID optional so when it's
- // missing it's clear that we're simply addressing an ip:port.
- if (message->to.id != "") {
- out << "/" << message->to.id;
- }
+ out << "POST ";
+ // Nothing keeps the 'id' component of a PID from being an empty
+ // string which would create a malformed path that has two
+ // '//' unless we check for it explicitly.
+ // TODO(benh): Make the 'id' part of a PID optional so when it's
+ // missing it's clear that we're simply addressing an ip:port.
+ if (message.to.id != "") {
+ out << "/" << message.to.id;
+ }
- out << "/" << message->name << " HTTP/1.1\r\n"
- << "User-Agent: libprocess/" << message->from << "\r\n"
- << "Libprocess-From: " << message->from << "\r\n"
- << "Connection: Keep-Alive\r\n"
- << "Host: \r\n";
-
- if (message->body.size() > 0) {
- out << "Transfer-Encoding: chunked\r\n\r\n"
- << std::hex << message->body.size() << "\r\n";
- out.write(message->body.data(), message->body.size());
- out << "\r\n"
- << "0\r\n"
- << "\r\n";
- } else {
- out << "\r\n";
- }
+ out << "/" << message.name << " HTTP/1.1\r\n"
+ << "User-Agent: libprocess/" << message.from << "\r\n"
+ << "Libprocess-From: " << message.from << "\r\n"
+ << "Connection: Keep-Alive\r\n"
+ << "Host: \r\n";
+
+ if (message.body.size() > 0) {
+ out << "Transfer-Encoding: chunked\r\n\r\n"
+ << std::hex << message.body.size() << "\r\n";
+ out.write(message.body.data(), message.body.size());
+ out << "\r\n"
+ << "0\r\n"
+ << "\r\n";
+ } else {
+ out << "\r\n";
}
return out.str();
}
-
-private:
- Message* message;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 182dd91..4ddeba3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -426,7 +426,7 @@ public:
void send(const Response& response,
const Request& request,
const Socket& socket);
- void send(Message* message,
+ void send(Message&& message,
const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
Encoder* next(int_fd s);
@@ -467,7 +467,7 @@ private:
void send_connect(
const Future<Nothing>& future,
Socket socket,
- Message* message);
+ Message&& message);
// Collection of all active sockets (both inbound and outbound).
hashmap<int_fd, Socket> sockets;
@@ -740,28 +740,27 @@ void Clock::settle()
}
-static Message* encode(const UPID& from,
- const UPID& to,
- const string& name,
- const string& data = "")
+static Message encode(
+ const UPID& from,
+ const UPID& to,
+ const string& name,
+ const char* data,
+ size_t length)
{
- Message* message = new Message();
- message->from = from;
- message->to = to;
- message->name = name;
- message->body = data;
+ Message message{name, from, to, string(data, length)};
return message;
}
-static void transport(Message* message, ProcessBase* sender = nullptr)
+static void transport(Message&& message, ProcessBase* sender = nullptr)
{
- if (message->to.address == __address__) {
+ if (message.to.address == __address__) {
// Local message.
- process_manager->deliver(message->to, new MessageEvent(message), sender);
+ MessageEvent* event = new MessageEvent(std::move(message));
+ process_manager->deliver(event->message.to, event, sender);
} else {
// Remote message.
- socket_manager->send(message);
+ socket_manager->send(std::move(message));
}
}
@@ -802,7 +801,7 @@ static Future<Owned<Request>> convert(Owned<Request>&& pipeRequest)
}
-static Future<Message*> parse(const Request& request)
+static Future<MessageEvent*> parse(const Request& request)
{
// TODO(benh): Do better error handling (to deal with a malformed
// libprocess message, malicious or otherwise).
@@ -856,13 +855,13 @@ static Future<Message*> parse(const Request& request)
return reader.readAll()
.then([from, name, to](const string& body) {
- Message* message = new Message();
- message->name = name;
- message->from = from.get();
- message->to = to;
- message->body = body;
+ Message message;
+ message.name = name;
+ message.from = from.get();
+ message.to = to;
+ message.body = body;
- return message;
+ return new MessageEvent(std::move(message));
});
}
@@ -2192,12 +2191,12 @@ void SocketManager::send(
void SocketManager::send_connect(
const Future<Nothing>& future,
Socket socket,
- Message* message)
+ Message&& message)
{
if (future.isDiscarded() || future.isFailed()) {
if (future.isFailed()) {
- VLOG(1) << "Failed to send '" << message->name << "' to '"
- << message->to.address << "', connect: " << future.failure();
+ VLOG(1) << "Failed to send '" << message.name << "' to '"
+ << message.to.address << "', connect: " << future.failure();
}
// Check if SSL is enabled, and whether we allow a downgrade to
@@ -2219,7 +2218,6 @@ void SocketManager::send_connect(
if (create.isError()) {
VLOG(1) << "Failed to link, create socket: " << create.error();
socket_manager->close(socket);
- delete message;
return;
}
@@ -2234,13 +2232,13 @@ void SocketManager::send_connect(
}
CHECK_SOME(poll_socket);
- poll_socket.get().connect(message->to.address)
+ poll_socket.get().connect(message.to.address)
.onAny(lambda::bind(
- &SocketManager::send_connect,
- this,
- lambda::_1,
- poll_socket.get(),
- message));
+ // TODO(benh): with C++14 we can use lambda instead of
+ // `std::bind` and capture `message` with a `std::move`.
+ [this, poll_socket](Message& message, const Future<Nothing>& f) {
+ send_connect(f, poll_socket.get(), std::move(message));
+ }, std::move(message), lambda::_1));
// We don't need to 'shutdown()' the socket as it was never
// connected.
@@ -2250,7 +2248,6 @@ void SocketManager::send_connect(
socket_manager->close(socket);
- delete message;
return;
}
@@ -2274,11 +2271,9 @@ void SocketManager::send_connect(
}
-void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
+void SocketManager::send(Message&& message, const SocketImpl::Kind& kind)
{
- CHECK(message != nullptr);
-
- const Address& address = message->to.address;
+ const Address& address = message.to.address;
Option<Socket> socket = None();
bool connect = false;
@@ -2315,7 +2310,6 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
Try<Socket> create = Socket::create(kind);
if (create.isError()) {
VLOG(1) << "Failed to send, create socket: " << create.error();
- delete message;
return;
}
socket = create.get();
@@ -2340,11 +2334,11 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
CHECK_SOME(socket);
socket->connect(address)
.onAny(lambda::bind(
- &SocketManager::send_connect,
- this,
- lambda::_1,
- socket.get(),
- message));
+ // TODO(benh): with C++14 we can use lambda instead of
+ // `std::bind` and capture `message` with a `std::move`.
+ [this, socket](Message& message, const Future<Nothing>& f) {
+ send_connect(f, socket.get(), std::move(message));
+ }, std::move(message), lambda::_1));
} else {
// If we're not connecting and we haven't added the encoder to
// the 'outgoing' queue then schedule it to be sent.
@@ -2897,7 +2891,7 @@ void ProcessManager::handle(
// from `SocketManager::finalize()` due to it closing all active sockets
// during libprocess finalization.
parse(*request)
- .onAny([this, socket, request](const Future<Message*>& future) {
+ .onAny([this, socket, request](const Future<MessageEvent*>& future) {
// Get the HttpProxy pid for this socket.
PID<HttpProxy> proxy = socket_manager->proxy(socket);
@@ -2914,7 +2908,7 @@ void ProcessManager::handle(
return;
}
- Message* message = CHECK_NOTNULL(future.get());
+ MessageEvent* event = CHECK_NOTNULL(future.get());
// Verify that the UPID this peer is claiming is on the same IP
// address the peer is sending from.
@@ -2927,10 +2921,10 @@ void ProcessManager::handle(
network::convert<Address>(request->client.get());
if (client_ip_address.isError() ||
- message->from.address.ip != client_ip_address->ip) {
+ event->message.from.address.ip != client_ip_address->ip) {
Response response = BadRequest(
"UPID IP address validation failed: Message from " +
- stringify(message->from) + " was sent from IP " +
+ stringify(event->message.from) + " was sent from IP " +
stringify(request->client.get()));
dispatch(proxy, &HttpProxy::enqueue, response, *request);
@@ -2940,14 +2934,14 @@ void ProcessManager::handle(
<< ": " << response.body;
delete request;
- delete message;
+ delete event;
return;
}
}
// TODO(benh): Use the sender PID when delivering in order to
// capture happens-before timing relationships for testing.
- bool accepted = deliver(message->to, new MessageEvent(message));
+ bool accepted = deliver(event->message.to, event);
// Only send back an HTTP response if this isn't from libprocess
// (which we determine by looking at the User-Agent). This is
@@ -3676,7 +3670,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
JSON::Object object;
object.values["type"] = "MESSAGE";
- const Message& message = *event.message;
+ const Message& message = event.message;
object.values["name"] = message.name;
object.values["from"] = string(message.from);
@@ -3798,9 +3792,9 @@ void ProcessBase::inject(
if (!from)
return;
- Message* message = encode(from, pid, name, string(data, length));
+ Message message = encode(from, pid, name, data, length);
- enqueue(new MessageEvent(message), true);
+ enqueue(new MessageEvent(std::move(message)), true);
}
@@ -3815,22 +3809,22 @@ void ProcessBase::send(
}
// Encode and transport outgoing message.
- transport(encode(pid, to, name, string(data, length)), this);
+ transport(encode(pid, to, name, data, length), this);
}
void ProcessBase::visit(const MessageEvent& event)
{
- if (handlers.message.count(event.message->name) > 0) {
- handlers.message[event.message->name](
- event.message->from,
- event.message->body);
- } else if (delegates.count(event.message->name) > 0) {
- VLOG(1) << "Delegating message '" << event.message->name
- << "' to " << delegates[event.message->name];
- Message* message = new Message(*event.message);
- message->to = delegates[event.message->name];
- transport(message, this);
+ if (handlers.message.count(event.message.name) > 0) {
+ handlers.message[event.message.name](
+ event.message.from,
+ event.message.body);
+ } else if (delegates.count(event.message.name) > 0) {
+ VLOG(1) << "Delegating message '" << event.message.name
+ << "' to " << delegates[event.message.name];
+ Message message(event.message);
+ message.to = delegates[event.message.name];
+ transport(std::move(message), this);
}
}
@@ -4205,7 +4199,7 @@ void post(const UPID& to, const string& name, const char* data, size_t length)
}
// Encode and transport outgoing message.
- transport(encode(UPID(), to, name, string(data, length)));
+ transport(encode(UPID(), to, name, data, length));
}
@@ -4222,7 +4216,7 @@ void post(const UPID& from,
}
// Encode and transport outgoing message.
- transport(encode(from, to, name, string(data, length)));
+ transport(encode(from, to, name, data, length));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index c610954..ed11909 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -741,7 +741,7 @@ protected:
AWAIT_ASSERT_READY(event);
// Save the PID of the linkee.
- pid = event->message->from;
+ pid = event->message.from;
terminate(coordinator);
wait(coordinator);
@@ -1272,7 +1272,7 @@ TEST(ProcessTest, THREADSAFE_Remote)
message.from = UPID("sender", sender.get());
message.to = process.self();
- const string data = MessageEncoder::encode(&message);
+ const string data = MessageEncoder::encode(message);
AWAIT_READY(socket.send(data));
http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 29df3c5..cc48271 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -187,7 +187,7 @@ int main(int argc, char** argv)
message.from = UPID("(1)", address);
message.to = parent;
- outgoing->send(MessageEncoder::encode(&message));
+ outgoing->send(MessageEncoder::encode(message));
});
// Now sit and accept links until the linkee is killed.