You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/12/04 17:15:22 UTC

[mesos] 04/04: Improved operator api subscribe initial payload performance.

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d9fe31158b54238e1621a668189d34a793dacd5e
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 18:44:21 2019 -0500

    Improved operator api subscribe initial payload performance.
    
    This uses the same approach for other GET_ calls in MESOS-10026
    of directly serializing from in-memory state, rather than building
    up the temporary object and evolving it.
    
    There is currently no benchmark but the improvement should closely
    resemble that of the GET_STATE call, for example:
    
        Before:
        v0 '/state' response took 6.55 secs
        v1 'GetState' application/x-protobuf response took 24.08 secs
        v1 'GetState' application/json response took 22.76 secs
    
        After:
        v0 '/state' response took 8.00 secs
        v1 'GetState' application/x-protobuf response took 5.73 secs
        v1 'GetState' application/json response took 9.62 secs
    
    Review: https://reviews.apache.org/r/71827
---
 src/master/http.cpp   | 136 +++++++++++++++++++++++++++++++++++++++++++++++---
 src/master/master.hpp |   5 ++
 2 files changed, 134 insertions(+), 7 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6d84856..72587bf 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -438,15 +438,70 @@ Future<Response> Master::Http::subscribe(
           StreamingHttpConnection<v1::master::Event> http(
               pipe.writer(), contentType);
 
-          mesos::master::Event event;
-          event.set_type(mesos::master::Event::SUBSCRIBED);
-          *event.mutable_subscribed()->mutable_get_state() =
-            _getState(approvers);
+          // Serialize the following event:
+          //
+          //   mesos::master::Event event;
+          //   event.set_type(mesos::master::Event::SUBSCRIBED);
+          //   *event.mutable_subscribed()->mutable_get_state() =
+          //     _getState(approvers);
+          //   event.mutable_subscribed()->set_heartbeat_interval_seconds(
+          //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+          //
+          //   http.send(event);
+
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string serialized;
+              google::protobuf::io::StringOutputStream stream(&serialized);
+              google::protobuf::io::CodedOutputStream writer(&stream);
+
+              WireFormatLite::WriteEnum(
+                  mesos::v1::master::Event::kTypeFieldNumber,
+                  mesos::v1::master::Event::SUBSCRIBED,
+                  &writer);
+
+              WireFormatLite::WriteBytes(
+                  mesos::v1::master::Event::kSubscribedFieldNumber,
+                  serializeSubscribe(approvers),
+                  &writer);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              http.send(serialized);
+
+              break;
+            }
+
+            case ContentType::JSON: {
+              string serialized = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::master::Event::descriptor();
 
-          event.mutable_subscribed()->set_heartbeat_interval_seconds(
-              DEFAULT_HEARTBEAT_INTERVAL.secs());
+                int field;
 
-          http.send(event);
+                field = v1::master::Event::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::master::Event::Type_Name(
+                        v1::master::Event::SUBSCRIBED));
+
+                field = v1::master::Event::kSubscribedFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifySubscribe(master, approvers));
+              });
+
+              http.send(serialized);
+
+              break;
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
 
           mesos::master::Event heartbeatEvent;
           heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
@@ -461,6 +516,73 @@ Future<Response> Master::Http::subscribe(
 }
 
 
+function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe(
+    const Master* master,
+    const Owned<ObjectApprovers>& approvers)
+{
+  // Jsonify the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Event::Subscribed::descriptor();
+
+    int field;
+
+    field = v1::master::Event::Subscribed::kGetStateFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetState(master, approvers));
+
+    field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        DEFAULT_HEARTBEAT_INTERVAL.secs());
+  };
+}
+
+
+string Master::Http::serializeSubscribe(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
+      serializeGetState(approvers),
+      &writer);
+
+  WireFormatLite::WriteDouble(
+      mesos::v1::master::Event::Subscribed
+        ::kHeartbeatIntervalSecondsFieldNumber,
+      DEFAULT_HEARTBEAT_INTERVAL.secs(),
+      &writer);
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
 // TODO(ijimenez): Add some information or pointers to help
 // users understand the HTTP Event/Call API.
 string Master::Http::SCHEDULER_HELP()
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9363042..f97b085 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1959,6 +1959,11 @@ private:
     mesos::master::Response::GetState _getState(
         const process::Owned<ObjectApprovers>& approvers) const;
 
+    static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
+        const Master* master,
+        const process::Owned<ObjectApprovers>& approvers);
+    std::string serializeSubscribe(
+        const process::Owned<ObjectApprovers>& approvers) const;
     process::Future<process::http::Response> subscribe(
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,