You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/12/04 17:15:22 UTC
[mesos] 04/04: Improved operator api subscribe initial payload
performance.
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d9fe31158b54238e1621a668189d34a793dacd5e
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 18:44:21 2019 -0500
Improved operator api subscribe initial payload performance.
This uses the same approach for other GET_ calls in MESOS-10026
of directly serializing from in-memory state, rather than building
up the temporary object and evolving it.
There is currently no benchmark but the improvement should closely
resemble that of the GET_STATE call, for example:
Before:
v0 '/state' response took 6.55 secs
v1 'GetState' application/x-protobuf response took 24.08 secs
v1 'GetState' application/json response took 22.76 secs
After:
v0 '/state' response took 8.00 secs
v1 'GetState' application/x-protobuf response took 5.73 secs
v1 'GetState' application/json response took 9.62 secs
Review: https://reviews.apache.org/r/71827
---
src/master/http.cpp | 136 +++++++++++++++++++++++++++++++++++++++++++++++---
src/master/master.hpp | 5 ++
2 files changed, 134 insertions(+), 7 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6d84856..72587bf 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -438,15 +438,70 @@ Future<Response> Master::Http::subscribe(
StreamingHttpConnection<v1::master::Event> http(
pipe.writer(), contentType);
- mesos::master::Event event;
- event.set_type(mesos::master::Event::SUBSCRIBED);
- *event.mutable_subscribed()->mutable_get_state() =
- _getState(approvers);
+ // Serialize the following event:
+ //
+ // mesos::master::Event event;
+ // event.set_type(mesos::master::Event::SUBSCRIBED);
+ // *event.mutable_subscribed()->mutable_get_state() =
+ // _getState(approvers);
+ // event.mutable_subscribed()->set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+ //
+ // http.send(event);
+
+ switch (contentType) {
+ case ContentType::PROTOBUF: {
+ string serialized;
+ google::protobuf::io::StringOutputStream stream(&serialized);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Event::kTypeFieldNumber,
+ mesos::v1::master::Event::SUBSCRIBED,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Event::kSubscribedFieldNumber,
+ serializeSubscribe(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ http.send(serialized);
+
+ break;
+ }
+
+ case ContentType::JSON: {
+ string serialized = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Event::descriptor();
- event.mutable_subscribed()->set_heartbeat_interval_seconds(
- DEFAULT_HEARTBEAT_INTERVAL.secs());
+ int field;
- http.send(event);
+ field = v1::master::Event::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Event::Type_Name(
+ v1::master::Event::SUBSCRIBED));
+
+ field = v1::master::Event::kSubscribedFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifySubscribe(master, approvers));
+ });
+
+ http.send(serialized);
+
+ break;
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
mesos::master::Event heartbeatEvent;
heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
@@ -461,6 +516,73 @@ Future<Response> Master::Http::subscribe(
}
+function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe(
+ const Master* master,
+ const Owned<ObjectApprovers>& approvers)
+{
+ // Jsonify the following message:
+ //
+ // mesos::master::Event::Subscribed subscribed;
+ // *subscribed.mutable_get_state() = _getState(approvers);
+ // subscribed.set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Event::Subscribed::descriptor();
+
+ int field;
+
+ field = v1::master::Event::Subscribed::kGetStateFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetState(master, approvers));
+
+ field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ DEFAULT_HEARTBEAT_INTERVAL.secs());
+ };
+}
+
+
+string Master::Http::serializeSubscribe(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Event::Subscribed subscribed;
+ // *subscribed.mutable_get_state() = _getState(approvers);
+ // subscribed.set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
+ serializeGetState(approvers),
+ &writer);
+
+ WireFormatLite::WriteDouble(
+ mesos::v1::master::Event::Subscribed
+ ::kHeartbeatIntervalSecondsFieldNumber,
+ DEFAULT_HEARTBEAT_INTERVAL.secs(),
+ &writer);
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
// TODO(ijimenez): Add some information or pointers to help
// users understand the HTTP Event/Call API.
string Master::Http::SCHEDULER_HELP()
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9363042..f97b085 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1959,6 +1959,11 @@ private:
mesos::master::Response::GetState _getState(
const process::Owned<ObjectApprovers>& approvers) const;
+ static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
+ const Master* master,
+ const process::Owned<ObjectApprovers>& approvers);
+ std::string serializeSubscribe(
+ const process::Owned<ObjectApprovers>& approvers) const;
process::Future<process::http::Response> subscribe(
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,