You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/12/03 04:14:24 UTC
[mesos] branch master updated: Simplified V0 -> V1 direct
serialization logic.
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 3dda362 Simplified V0 -> V1 direct serialization logic.
3dda362 is described below
commit 3dda3622f5ed01e8c132dc5ca594b1f1ef51a298
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Tue Nov 26 15:01:46 2019 -0500
Simplified V0 -> V1 direct serialization logic.
There are higher level utility functions that can be used to make
the code simpler and less error prone.
Also, while not a bug, we should be using WriteBytes instead of
WriteString for writing submessage bytes.
Review: https://reviews.apache.org/r/71823
---
src/common/protobuf_utils.hpp | 22 +++
src/master/http.cpp | 353 ++++++++++++++++--------------------------
2 files changed, 156 insertions(+), 219 deletions(-)
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 93ee42a..3852f59 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -66,6 +66,28 @@ struct Slave;
namespace protobuf {
+// Modeled after WireFormatLite from protobuf, but to provide
+// missing helpers.
+class WireFormatLite2
+{
+public:
+ // This is a wrapper to compute cached sizes before calling into
+ // `WireFormatLite::WriteMessage`, which assumes that sizes are
+ // already cached.
+ static void WriteMessageWithoutCachedSizes(
+ int field_number,
+ const google::protobuf::MessageLite& value,
+ google::protobuf::io::CodedOutputStream* output)
+ {
+ // Cache the sizes first.
+ value.ByteSizeLong();
+
+ google::protobuf::internal::WireFormatLite::WriteMessage(
+ field_number, value, output);
+ }
+};
+
+
// Internal helper class for protobuf union validation.
class UnionValidator
{
diff --git a/src/master/http.cpp b/src/master/http.cpp
index e036558..6d84856 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -154,6 +154,8 @@ using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_TASK;
+using mesos::internal::protobuf::WireFormatLite2;
+
namespace mesos {
namespace internal {
namespace master {
@@ -1346,21 +1348,15 @@ Future<Response> Master::Http::getFrameworks(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_FRAMEWORKS);
-
- string serializedGetFrameworks =
- serializeGetFrameworks(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetFrameworksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetFrameworks.size());
- writer.WriteString(serializedGetFrameworks);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_FRAMEWORKS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetFrameworksFieldNumber,
+ serializeGetFrameworks(approvers),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -1487,14 +1483,10 @@ string Master::Http::serializeGetFrameworks(
continue;
}
- mesos::master::Response::GetFrameworks::Framework f = model(*framework);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::master::Response::GetFrameworks
- ::kFrameworksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(f.ByteSizeLong());
- f.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
+ model(*framework),
+ &writer);
}
foreachvalue (const Owned<Framework>& framework,
@@ -1504,14 +1496,10 @@ string Master::Http::serializeGetFrameworks(
continue;
}
- mesos::master::Response::GetFrameworks::Framework f = model(*framework);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::master::Response::GetFrameworks
- ::kCompletedFrameworksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(f.ByteSizeLong());
- f.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
+ model(*framework),
+ &writer);
}
// While an explicit Trim() isn't necessary (since the coded
@@ -1579,20 +1567,15 @@ Future<Response> Master::Http::getExecutors(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_EXECUTORS);
-
- string serializedGetExecutors = serializeGetExecutors(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetExecutorsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetExecutors.size());
- writer.WriteString(serializedGetExecutors);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_EXECUTORS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetExecutorsFieldNumber,
+ serializeGetExecutors(approvers),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -1744,21 +1727,17 @@ string Master::Http::serializeGetExecutors(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetExecutors::Executor
- ::kExecutorInfoFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(e.ByteSizeLong());
- e.SerializeToCodedStream(&writer);
-
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetExecutors::Executor
- ::kAgentIdFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(s.ByteSizeLong());
- s.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetExecutors::Executor
+ ::kExecutorInfoFieldNumber,
+ e,
+ &writer);
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetExecutors::Executor
+ ::kAgentIdFieldNumber,
+ s,
+ &writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
@@ -1795,14 +1774,10 @@ string Master::Http::serializeGetExecutors(
continue;
}
- string serializedExecutor = serializeExecutor(executorInfo, slaveId);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetExecutors
- ::kExecutorsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedExecutor.size());
- writer.WriteString(serializedExecutor);
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
+ serializeExecutor(executorInfo, slaveId),
+ &writer);
}
}
}
@@ -1894,20 +1869,15 @@ Future<Response> Master::Http::getState(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_STATE);
-
- string serializedGetState = serializeGetState(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetStateFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetState.size());
- writer.WriteString(serializedGetState);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_STATE,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetStateFieldNumber,
+ serializeGetState(approvers),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -2004,37 +1974,25 @@ string Master::Http::serializeGetState(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- string serializedGetTasks = serializeGetTasks(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetTasks.size());
- writer.WriteString(serializedGetTasks);
-
- string serializedGetExecutors = serializeGetExecutors(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetExecutors.size());
- writer.WriteString(serializedGetExecutors);
-
- string serializedGetFrameworks = serializeGetFrameworks(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetFrameworks.size());
- writer.WriteString(serializedGetFrameworks);
-
- string serializedGetAgents = serializeGetAgents(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetAgents.size());
- writer.WriteString(serializedGetAgents);
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
+ serializeGetTasks(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
+ serializeGetExecutors(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
+ serializeGetFrameworks(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
+ serializeGetAgents(approvers),
+ &writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
@@ -2324,18 +2282,11 @@ string serializeGetMetrics(const map<string, double>& metrics)
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::Metric::kNameFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(key.size());
- writer.WriteString(key);
+ WireFormatLite::WriteString(
+ mesos::v1::Metric::kNameFieldNumber, key, &writer);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::Metric::kValueFieldNumber,
- WireFormatLite::WIRETYPE_FIXED64));
- writer.WriteLittleEndian64(WireFormatLite::EncodeDouble(value));
+ WireFormatLite::WriteDouble(
+ mesos::v1::Metric::kValueFieldNumber, value, &writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
@@ -2351,13 +2302,10 @@ string serializeGetMetrics(const map<string, double>& metrics)
google::protobuf::io::CodedOutputStream writer(&stream);
foreachpair (const string& key, double value, metrics) {
- string serializedMetric = serializeMetric(key, value);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetMetrics::kMetricsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedMetric.size());
- writer.WriteString(serializedMetric);
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetMetrics::kMetricsFieldNumber,
+ serializeMetric(key, value),
+ &writer);
}
// While an explicit Trim() isn't necessary (since the coded
@@ -2397,20 +2345,15 @@ Future<Response> Master::Http::getMetrics(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_METRICS);
-
- string serializedGetMetrics = serializeGetMetrics(metrics);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetMetricsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetMetrics.size());
- writer.WriteString(serializedGetMetrics);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_METRICS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetMetricsFieldNumber,
+ serializeGetMetrics(metrics),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -2855,20 +2798,15 @@ Future<Response> Master::Http::getAgents(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_AGENTS);
-
- string serializedGetAgents = serializeGetAgents(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetAgentsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetAgents.size());
- writer.WriteString(serializedGetAgents);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_AGENTS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetAgentsFieldNumber,
+ serializeGetAgents(approvers),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -3003,19 +2941,14 @@ string Master::Http::serializeGetAgents(
foreachvalue (const Slave* slave, master->slaves.registered) {
// TODO(bmahler): Consider not constructing the temporary
// agent object and instead serialize directly.
- mesos::master::Response::GetAgents::Agent agent =
- protobuf::master::event::createAgentResponse(
- *slave,
- master->slaves.draining.get(slave->id),
- master->slaves.deactivated.contains(slave->id),
- approvers);
-
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::master::Response::GetAgents::kAgentsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(agent.ByteSizeLong());
- agent.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetAgents::kAgentsFieldNumber,
+ protobuf::master::event::createAgentResponse(
+ *slave,
+ master->slaves.draining.get(slave->id),
+ master->slaves.deactivated.contains(slave->id),
+ approvers),
+ &writer);
}
foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
@@ -3029,12 +2962,10 @@ string Master::Http::serializeGetAgents(
}
}
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(agent.ByteSizeLong());
- agent.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
+ agent,
+ &writer);
}
// While an explicit Trim() isn't necessary (since the coded
@@ -4017,20 +3948,15 @@ Future<Response> Master::Http::getTasks(
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kTypeFieldNumber,
- WireFormatLite::WIRETYPE_VARINT));
- writer.WriteVarint32SignExtended(
- mesos::v1::master::Response::GET_TASKS);
-
- string serializedGetTasks = serializeGetTasks(approvers);
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::kGetTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(serializedGetTasks.size());
- writer.WriteString(serializedGetTasks);
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_TASKS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetTasksFieldNumber,
+ serializeGetTasks(approvers),
+ &writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
@@ -4237,13 +4163,10 @@ string Master::Http::serializeGetTasks(
//
// *getTasks.add_pending_tasks() =
// protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- Task task = protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(task.ByteSizeLong());
- task.SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
+ protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
+ &writer);
}
// Active tasks.
@@ -4254,12 +4177,10 @@ string Master::Http::serializeGetTasks(
continue;
}
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(task->ByteSizeLong());
- task->SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
+ *task,
+ &writer);
}
// Unreachable tasks.
@@ -4269,13 +4190,10 @@ string Master::Http::serializeGetTasks(
continue;
}
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetTasks::
- kUnreachableTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(task->ByteSizeLong());
- task->SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
+ *task,
+ &writer);
}
// Completed tasks.
@@ -4285,13 +4203,10 @@ string Master::Http::serializeGetTasks(
continue;
}
- // *getTasks.add_completed_tasks() = *task;
- writer.WriteTag(
- WireFormatLite::MakeTag(
- mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
- writer.WriteVarint32(task->ByteSizeLong());
- task->SerializeToCodedStream(&writer);
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
+ *task,
+ &writer);
}
}