You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/19 20:21:23 UTC

[4/7] mesos git commit: Removed extra/unnecessary allocations of Message.

Removed extra/unnecessary allocations of Message.

Review: https://reviews.apache.org/r/60831


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7413a3c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7413a3c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7413a3c4

Branch: refs/heads/master
Commit: 7413a3c43a82287519712f24151126453f0d82f6
Parents: c1b6d97
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Jul 11 17:38:01 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp   |  27 ++--
 3rdparty/libprocess/include/process/gmock.hpp   |  44 +++++--
 .../libprocess/include/process/protobuf.hpp     |   8 +-
 3rdparty/libprocess/src/encoder.hpp             |  66 ++++------
 3rdparty/libprocess/src/process.cpp             | 124 +++++++++----------
 3rdparty/libprocess/src/tests/process_tests.cpp |   4 +-
 3rdparty/libprocess/src/tests/test_linkee.cpp   |   2 +-
 7 files changed, 139 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 8afe626..a0ec053 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -86,30 +86,25 @@ struct Event
 
 struct MessageEvent : Event
 {
-  explicit MessageEvent(Message* _message)
-    : message(_message) {}
+  explicit MessageEvent(Message&& _message)
+    : message(std::move(_message)) {}
 
-  MessageEvent(const MessageEvent& that)
-    : message(that.message == nullptr ? nullptr : new Message(*that.message)) {}
+  MessageEvent(const MessageEvent& that) = default;
+  MessageEvent(MessageEvent&& that) = default;
 
-  virtual ~MessageEvent()
-  {
-    delete message;
-  }
+  // Keep MessageEvent not assignable even though we made it
+  // copyable.
+  // Note that we are violating the "rule of three" here but it helps
+  // keep the fields const.
+  MessageEvent& operator=(const MessageEvent&) = delete;
+  MessageEvent& operator=(MessageEvent&&) = delete;
 
   virtual void visit(EventVisitor* visitor) const
   {
     visitor->visit(*this);
   }
 
-  Message* const message;
-
-private:
-  // Keep MessageEvent not assignable even though we made it
-  // copyable.
-  // Note that we are violating the "rule of three" here but it helps
-  // keep the fields const.
-  MessageEvent& operator=(const MessageEvent&);
+  const Message message;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e9af943..231efcd 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -109,6 +109,32 @@ PromiseArgFieldActionP2<index, Field, process::Promise<T>*> FutureArgField(
 }
 
 
+ACTION_TEMPLATE(PromiseArgNotPointerField,
+                HAS_1_TEMPLATE_PARAMS(int, k),
+                AND_2_VALUE_PARAMS(field, promise))
+{
+  // TODO(benh): Use a shared_ptr for promise to defend against this
+  // action getting invoked more than once (e.g., used via
+  // WillRepeatedly). We won't be able to set it a second time but at
+  // least we won't get a segmentation fault. We could also consider
+  // warning users if they attempted to set it more than once.
+  promise->set(std::get<k>(args).*field);
+  delete promise;
+}
+
+
+template <int index, typename Field, typename T>
+PromiseArgNotPointerFieldActionP2<index, Field, process::Promise<T>*>
+FutureArgNotPointerField(
+    Field field,
+    process::Future<T>* future)
+{
+  process::Promise<T>* promise = new process::Promise<T>();
+  *future = promise->future();
+  return PromiseArgNotPointerField<index>(field, promise);
+}
+
+
 ACTION_P2(PromiseSatisfy, promise, value)
 {
   promise->set(value);
@@ -319,9 +345,9 @@ private:
 MATCHER_P3(MessageMatcher, name, from, to, "")
 {
   const MessageEvent& event = ::std::get<0>(arg);
-  return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
-          testing::Matcher<UPID>(from).Matches(event.message->from) &&
-          testing::Matcher<UPID>(to).Matches(event.message->to));
+  return (testing::Matcher<std::string>(name).Matches(event.message.name) &&
+          testing::Matcher<UPID>(from).Matches(event.message.from) &&
+          testing::Matcher<UPID>(to).Matches(event.message.to));
 }
 
 
@@ -334,11 +360,11 @@ MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
   message_type message;
 
   return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
-              event.message->name) &&
-          message.ParseFromString(event.message->body) &&
+              event.message.name) &&
+          message.ParseFromString(event.message.body) &&
           testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
-          testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
-          testing::Matcher<process::UPID>(to).Matches(event.message->to));
+          testing::Matcher<process::UPID>(from).Matches(event.message.from) &&
+          testing::Matcher<process::UPID>(to).Matches(event.message.to));
 }
 
 
@@ -440,7 +466,7 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
   synchronized (filter->mutex) {
     EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
       .With(MessageMatcher(name, from, to))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))
@@ -462,7 +488,7 @@ Future<process::Message> FutureUnionMessage(
   synchronized (filter->mutex) {
     EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
       .With(UnionMessageMatcher(message, unionType, from, to))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/protobuf.hpp b/3rdparty/libprocess/include/process/protobuf.hpp
index ba6e6d6..2b6b623 100644
--- a/3rdparty/libprocess/include/process/protobuf.hpp
+++ b/3rdparty/libprocess/include/process/protobuf.hpp
@@ -99,10 +99,10 @@ public:
 protected:
   virtual void visit(const process::MessageEvent& event)
   {
-    if (protobufHandlers.count(event.message->name) > 0) {
-      from = event.message->from; // For 'reply'.
-      protobufHandlers[event.message->name](
-          event.message->from, event.message->body);
+    if (protobufHandlers.count(event.message.name) > 0) {
+      from = event.message.from; // For 'reply'.
+      protobufHandlers[event.message.name](
+          event.message.from, event.message.body);
       from = process::UPID();
     } else {
       process::Process<T>::visit(event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 517ec21..70b5ec4 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -101,54 +101,42 @@ private:
 class MessageEncoder : public DataEncoder
 {
 public:
-  MessageEncoder(Message* _message)
-    : DataEncoder(encode(_message)), message(_message) {}
+  MessageEncoder(const Message& message)
+    : DataEncoder(encode(message)) {}
 
-  virtual ~MessageEncoder()
-  {
-    if (message != nullptr) {
-      delete message;
-    }
-  }
-
-  static std::string encode(Message* message)
+  static std::string encode(const Message& message)
   {
     std::ostringstream out;
 
-    if (message != nullptr) {
-      out << "POST ";
-      // Nothing keeps the 'id' component of a PID from being an empty
-      // string which would create a malformed path that has two
-      // '//' unless we check for it explicitly.
-      // TODO(benh): Make the 'id' part of a PID optional so when it's
-      // missing it's clear that we're simply addressing an ip:port.
-      if (message->to.id != "") {
-        out << "/" << message->to.id;
-      }
+    out << "POST ";
+    // Nothing keeps the 'id' component of a PID from being an empty
+    // string which would create a malformed path that has two
+    // '//' unless we check for it explicitly.
+    // TODO(benh): Make the 'id' part of a PID optional so when it's
+    // missing it's clear that we're simply addressing an ip:port.
+    if (message.to.id != "") {
+      out << "/" << message.to.id;
+    }
 
-      out << "/" << message->name << " HTTP/1.1\r\n"
-          << "User-Agent: libprocess/" << message->from << "\r\n"
-          << "Libprocess-From: " << message->from << "\r\n"
-          << "Connection: Keep-Alive\r\n"
-          << "Host: \r\n";
-
-      if (message->body.size() > 0) {
-        out << "Transfer-Encoding: chunked\r\n\r\n"
-            << std::hex << message->body.size() << "\r\n";
-        out.write(message->body.data(), message->body.size());
-        out << "\r\n"
-            << "0\r\n"
-            << "\r\n";
-      } else {
-        out << "\r\n";
-      }
+    out << "/" << message.name << " HTTP/1.1\r\n"
+        << "User-Agent: libprocess/" << message.from << "\r\n"
+        << "Libprocess-From: " << message.from << "\r\n"
+        << "Connection: Keep-Alive\r\n"
+        << "Host: \r\n";
+
+    if (message.body.size() > 0) {
+      out << "Transfer-Encoding: chunked\r\n\r\n"
+          << std::hex << message.body.size() << "\r\n";
+      out.write(message.body.data(), message.body.size());
+      out << "\r\n"
+          << "0\r\n"
+          << "\r\n";
+    } else {
+      out << "\r\n";
     }
 
     return out.str();
   }
-
-private:
-  Message* message;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 182dd91..4ddeba3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -426,7 +426,7 @@ public:
   void send(const Response& response,
             const Request& request,
             const Socket& socket);
-  void send(Message* message,
+  void send(Message&& message,
             const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
 
   Encoder* next(int_fd s);
@@ -467,7 +467,7 @@ private:
   void send_connect(
       const Future<Nothing>& future,
       Socket socket,
-      Message* message);
+      Message&& message);
 
   // Collection of all active sockets (both inbound and outbound).
   hashmap<int_fd, Socket> sockets;
@@ -740,28 +740,27 @@ void Clock::settle()
 }
 
 
-static Message* encode(const UPID& from,
-                       const UPID& to,
-                       const string& name,
-                       const string& data = "")
+static Message encode(
+    const UPID& from,
+    const UPID& to,
+    const string& name,
+    const char* data,
+    size_t length)
 {
-  Message* message = new Message();
-  message->from = from;
-  message->to = to;
-  message->name = name;
-  message->body = data;
+  Message message{name, from, to, string(data, length)};
   return message;
 }
 
 
-static void transport(Message* message, ProcessBase* sender = nullptr)
+static void transport(Message&& message, ProcessBase* sender = nullptr)
 {
-  if (message->to.address == __address__) {
+  if (message.to.address == __address__) {
     // Local message.
-    process_manager->deliver(message->to, new MessageEvent(message), sender);
+    MessageEvent* event = new MessageEvent(std::move(message));
+    process_manager->deliver(event->message.to, event, sender);
   } else {
     // Remote message.
-    socket_manager->send(message);
+    socket_manager->send(std::move(message));
   }
 }
 
@@ -802,7 +801,7 @@ static Future<Owned<Request>> convert(Owned<Request>&& pipeRequest)
 }
 
 
-static Future<Message*> parse(const Request& request)
+static Future<MessageEvent*> parse(const Request& request)
 {
   // TODO(benh): Do better error handling (to deal with a malformed
   // libprocess message, malicious or otherwise).
@@ -856,13 +855,13 @@ static Future<Message*> parse(const Request& request)
 
   return reader.readAll()
     .then([from, name, to](const string& body) {
-      Message* message = new Message();
-      message->name = name;
-      message->from = from.get();
-      message->to = to;
-      message->body = body;
+      Message message;
+      message.name = name;
+      message.from = from.get();
+      message.to = to;
+      message.body = body;
 
-      return message;
+      return new MessageEvent(std::move(message));
     });
 }
 
@@ -2192,12 +2191,12 @@ void SocketManager::send(
 void SocketManager::send_connect(
     const Future<Nothing>& future,
     Socket socket,
-    Message* message)
+    Message&& message)
 {
   if (future.isDiscarded() || future.isFailed()) {
     if (future.isFailed()) {
-      VLOG(1) << "Failed to send '" << message->name << "' to '"
-              << message->to.address << "', connect: " << future.failure();
+      VLOG(1) << "Failed to send '" << message.name << "' to '"
+              << message.to.address << "', connect: " << future.failure();
     }
 
     // Check if SSL is enabled, and whether we allow a downgrade to
@@ -2219,7 +2218,6 @@ void SocketManager::send_connect(
         if (create.isError()) {
           VLOG(1) << "Failed to link, create socket: " << create.error();
           socket_manager->close(socket);
-          delete message;
           return;
         }
 
@@ -2234,13 +2232,13 @@ void SocketManager::send_connect(
       }
 
       CHECK_SOME(poll_socket);
-      poll_socket.get().connect(message->to.address)
+      poll_socket.get().connect(message.to.address)
         .onAny(lambda::bind(
-            &SocketManager::send_connect,
-            this,
-            lambda::_1,
-            poll_socket.get(),
-            message));
+            // TODO(benh): with C++14 we can use lambda instead of
+            // `std::bind` and capture `message` with a `std::move`.
+            [this, poll_socket](Message& message, const Future<Nothing>& f) {
+              send_connect(f, poll_socket.get(), std::move(message));
+            }, std::move(message), lambda::_1));
 
       // We don't need to 'shutdown()' the socket as it was never
       // connected.
@@ -2250,7 +2248,6 @@ void SocketManager::send_connect(
 
     socket_manager->close(socket);
 
-    delete message;
     return;
   }
 
@@ -2274,11 +2271,9 @@ void SocketManager::send_connect(
 }
 
 
-void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
+void SocketManager::send(Message&& message, const SocketImpl::Kind& kind)
 {
-  CHECK(message != nullptr);
-
-  const Address& address = message->to.address;
+  const Address& address = message.to.address;
 
   Option<Socket> socket = None();
   bool connect = false;
@@ -2315,7 +2310,6 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
       Try<Socket> create = Socket::create(kind);
       if (create.isError()) {
         VLOG(1) << "Failed to send, create socket: " << create.error();
-        delete message;
         return;
       }
       socket = create.get();
@@ -2340,11 +2334,11 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
     CHECK_SOME(socket);
     socket->connect(address)
       .onAny(lambda::bind(
-          &SocketManager::send_connect,
-          this,
-          lambda::_1,
-          socket.get(),
-          message));
+            // TODO(benh): with C++14 we can use lambda instead of
+            // `std::bind` and capture `message` with a `std::move`.
+            [this, socket](Message& message, const Future<Nothing>& f) {
+              send_connect(f, socket.get(), std::move(message));
+            }, std::move(message), lambda::_1));
   } else {
     // If we're not connecting and we haven't added the encoder to
     // the 'outgoing' queue then schedule it to be sent.
@@ -2897,7 +2891,7 @@ void ProcessManager::handle(
     // from `SocketManager::finalize()` due to it closing all active sockets
     // during libprocess finalization.
     parse(*request)
-      .onAny([this, socket, request](const Future<Message*>& future) {
+      .onAny([this, socket, request](const Future<MessageEvent*>& future) {
         // Get the HttpProxy pid for this socket.
         PID<HttpProxy> proxy = socket_manager->proxy(socket);
 
@@ -2914,7 +2908,7 @@ void ProcessManager::handle(
           return;
         }
 
-        Message* message = CHECK_NOTNULL(future.get());
+        MessageEvent* event = CHECK_NOTNULL(future.get());
 
         // Verify that the UPID this peer is claiming is on the same IP
         // address the peer is sending from.
@@ -2927,10 +2921,10 @@ void ProcessManager::handle(
             network::convert<Address>(request->client.get());
 
           if (client_ip_address.isError() ||
-              message->from.address.ip != client_ip_address->ip) {
+              event->message.from.address.ip != client_ip_address->ip) {
             Response response = BadRequest(
                 "UPID IP address validation failed: Message from " +
-                stringify(message->from) + " was sent from IP " +
+                stringify(event->message.from) + " was sent from IP " +
                 stringify(request->client.get()));
 
             dispatch(proxy, &HttpProxy::enqueue, response, *request);
@@ -2940,14 +2934,14 @@ void ProcessManager::handle(
                     << ": " << response.body;
 
             delete request;
-            delete message;
+            delete event;
             return;
           }
         }
 
         // TODO(benh): Use the sender PID when delivering in order to
         // capture happens-before timing relationships for testing.
-        bool accepted = deliver(message->to, new MessageEvent(message));
+        bool accepted = deliver(event->message.to, event);
 
         // Only send back an HTTP response if this isn't from libprocess
         // (which we determine by looking at the User-Agent). This is
@@ -3676,7 +3670,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
           JSON::Object object;
           object.values["type"] = "MESSAGE";
 
-          const Message& message = *event.message;
+          const Message& message = event.message;
 
           object.values["name"] = message.name;
           object.values["from"] = string(message.from);
@@ -3798,9 +3792,9 @@ void ProcessBase::inject(
   if (!from)
     return;
 
-  Message* message = encode(from, pid, name, string(data, length));
+  Message message = encode(from, pid, name, data, length);
 
-  enqueue(new MessageEvent(message), true);
+  enqueue(new MessageEvent(std::move(message)), true);
 }
 
 
@@ -3815,22 +3809,22 @@ void ProcessBase::send(
   }
 
   // Encode and transport outgoing message.
-  transport(encode(pid, to, name, string(data, length)), this);
+  transport(encode(pid, to, name, data, length), this);
 }
 
 
 void ProcessBase::visit(const MessageEvent& event)
 {
-  if (handlers.message.count(event.message->name) > 0) {
-    handlers.message[event.message->name](
-        event.message->from,
-        event.message->body);
-  } else if (delegates.count(event.message->name) > 0) {
-    VLOG(1) << "Delegating message '" << event.message->name
-            << "' to " << delegates[event.message->name];
-    Message* message = new Message(*event.message);
-    message->to = delegates[event.message->name];
-    transport(message, this);
+  if (handlers.message.count(event.message.name) > 0) {
+    handlers.message[event.message.name](
+        event.message.from,
+        event.message.body);
+  } else if (delegates.count(event.message.name) > 0) {
+    VLOG(1) << "Delegating message '" << event.message.name
+            << "' to " << delegates[event.message.name];
+    Message message(event.message);
+    message.to = delegates[event.message.name];
+    transport(std::move(message), this);
   }
 }
 
@@ -4205,7 +4199,7 @@ void post(const UPID& to, const string& name, const char* data, size_t length)
   }
 
   // Encode and transport outgoing message.
-  transport(encode(UPID(), to, name, string(data, length)));
+  transport(encode(UPID(), to, name, data, length));
 }
 
 
@@ -4222,7 +4216,7 @@ void post(const UPID& from,
   }
 
   // Encode and transport outgoing message.
-  transport(encode(from, to, name, string(data, length)));
+  transport(encode(from, to, name, data, length));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index c610954..ed11909 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -741,7 +741,7 @@ protected:
     AWAIT_ASSERT_READY(event);
 
     // Save the PID of the linkee.
-    pid = event->message->from;
+    pid = event->message.from;
 
     terminate(coordinator);
     wait(coordinator);
@@ -1272,7 +1272,7 @@ TEST(ProcessTest, THREADSAFE_Remote)
   message.from = UPID("sender", sender.get());
   message.to = process.self();
 
-  const string data = MessageEncoder::encode(&message);
+  const string data = MessageEncoder::encode(message);
 
   AWAIT_READY(socket.send(data));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 29df3c5..cc48271 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -187,7 +187,7 @@ int main(int argc, char** argv)
       message.from = UPID("(1)", address);
       message.to = parent;
 
-      outgoing->send(MessageEncoder::encode(&message));
+      outgoing->send(MessageEncoder::encode(message));
     });
 
   // Now sit and accept links until the linkee is killed.