You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/04/19 23:44:14 UTC
mesos git commit: Removed unnecessary Registry copying to improve
performance.
Repository: mesos
Updated Branches:
refs/heads/master 99acafa5e -> 3b70d417a
Removed unnecessary Registry copying to improve performance.
When the number of agents is large every `Registry` copy operation takes
a lot of time (~0.4 sec with 55k agents), because it involves deep
copying a big object tree. Because of that, the use of `protobuf::State`
in `Registrar` incurs a dramatic performance cost from multiple protobuf
copying.
This patch drops the use of `protobuf::State` in `Registrar` in favor of
"untyped" `State` and manual serialization/deserialization in order to
minimize `Registry` copying and keep registry update timings at
acceptable values.
Performance improvements to `protobuf::State` should be explored in
order to make it usable in the registrar without regressing on the
performance of this approach.
Review: https://reviews.apache.org/r/58355/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3b70d417
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3b70d417
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3b70d417
Branch: refs/heads/master
Commit: 3b70d417a8642eeb0efb562d45cc0f7a7809f54f
Parents: 99acafa
Author: Ilya Pronin <ip...@twopensource.com>
Authored: Wed Apr 19 16:22:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Apr 19 16:42:51 2017 -0700
----------------------------------------------------------------------
src/master/main.cpp | 5 +-
src/master/registrar.cpp | 127 +++++++++++++++++++++++++------------
src/master/registrar.hpp | 4 +-
src/tests/cluster.cpp | 2 +-
src/tests/cluster.hpp | 4 +-
src/tests/mock_registrar.cpp | 4 +-
src/tests/mock_registrar.hpp | 4 +-
src/tests/registrar_tests.cpp | 4 +-
8 files changed, 98 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 90d159e..95a482b 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -37,7 +37,7 @@
#ifndef __WINDOWS__
#include <mesos/state/log.hpp>
#endif // __WINDOWS__
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <mesos/state/storage.hpp>
#include <mesos/zookeeper/detector.hpp>
@@ -392,8 +392,7 @@ int main(int argc, char** argv)
CHECK_NOTNULL(storage);
- mesos::state::protobuf::State* state =
- new mesos::state::protobuf::State(storage);
+ mesos::state::State* state = new mesos::state::State(storage);
Registrar* registrar =
new Registrar(flags, state, READONLY_HTTP_AUTHENTICATION_REALM);
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 0029cc7..82ec3d9 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -19,7 +19,7 @@
#include <mesos/type_utils.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
@@ -44,8 +44,8 @@
#include "master/registrar.hpp"
#include "master/registry.hpp"
-using mesos::state::protobuf::State;
-using mesos::state::protobuf::Variable;
+using mesos::state::State;
+using mesos::state::Variable;
using process::dispatch;
using process::spawn;
@@ -89,9 +89,9 @@ public:
const Option<string>& _authenticationRealm)
: ProcessBase(process::ID::generate("registrar")),
metrics(*this),
+ state(_state),
updating(false),
flags(_flags),
- state(_state),
authenticationRealm(_authenticationRealm) {}
virtual ~RegistrarProcess() {}
@@ -108,19 +108,20 @@ protected:
"/registry",
authenticationRealm.get(),
registryHelp(),
- &RegistrarProcess::registry);
+ &RegistrarProcess::getRegistry);
} else {
route(
"/registry",
registryHelp(),
- lambda::bind(&RegistrarProcess::registry, this, lambda::_1, None()));
+ lambda::bind(
+ &RegistrarProcess::getRegistry, this, lambda::_1, None()));
}
}
private:
// HTTP handlers.
// /registrar(N)/registry
- Future<Response> registry(
+ Future<Response> getRegistry(
const Request& request,
const Option<Principal>&);
static string registryHelp();
@@ -186,8 +187,8 @@ private:
Future<double> _registry_size_bytes()
{
- if (variable.isSome()) {
- return variable.get().get().ByteSize();
+ if (registry.isSome()) {
+ return registry->ByteSize();
}
return Failure("Not recovered yet");
@@ -196,14 +197,15 @@ private:
// Continuations.
void _recover(
const MasterInfo& info,
- const Future<Variable<Registry>>& recovery);
+ const Future<Variable>& recovery);
void __recover(const Future<bool>& recover);
Future<bool> _apply(Owned<Operation> operation);
// Helper for updating state (performing store).
void update();
void _update(
- const Future<Option<Variable<Registry>>>& store,
+ const Future<Option<Variable>>& store,
+ const Owned<Registry>& updatedRegistry,
deque<Owned<Operation>> operations);
// Fails all pending operations and transitions the Registrar
@@ -212,12 +214,24 @@ private:
// performing more State storage operations.
void abort(const string& message);
- Option<Variable<Registry>> variable;
+ // TODO(ipronin): We use the "untyped" `State` class here and perform
+ // the protobuf (de)serialization manually within the Registrar, because
+ // the use of `protobuf::State` incurs a dramatic peformance cost from
+ // protobuf copying. We should explore using `protobuf::State`, which will
+ // require move support and other copy elimination to maintain the
+ // performance of the current approach.
+ State* state;
+
+ // Per the TODO above, we store both serialized and deserialized versions
+ // of the `Registry` protobuf. If we're able to move to `protobuf::State`,
+ // we could just store a single `protobuf::state::Variable<Registry>`.
+ Option<Variable> variable;
+ Option<Registry> registry;
+
deque<Owned<Operation>> operations;
bool updating; // Used to signify fetching (recovering) or storing.
const Flags flags;
- State* state;
// Used to compose our operations with recovery.
Option<Owned<Promise<Registry>>> recovered;
@@ -256,14 +270,14 @@ void fail(deque<Owned<Operation>>* operations, const string& message)
}
-Future<Response> RegistrarProcess::registry(
+Future<Response> RegistrarProcess::getRegistry(
const Request& request,
const Option<Principal>&)
{
JSON::Object result;
- if (variable.isSome()) {
- result = JSON::protobuf(variable.get().get());
+ if (registry.isSome()) {
+ result = JSON::protobuf(registry.get());
}
return OK(result, request.url.query.get("jsonp"));
@@ -331,10 +345,10 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
VLOG(1) << "Recovering registrar";
metrics.state_fetch.start();
- state->fetch<Registry>("registry")
+ state->fetch("registry")
.after(flags.registry_fetch_timeout,
lambda::bind(
- &timeout<Variable<Registry>>,
+ &timeout<Variable>,
"fetch",
flags.registry_fetch_timeout,
lambda::_1))
@@ -349,7 +363,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
void RegistrarProcess::_recover(
const MasterInfo& info,
- const Future<Variable<Registry>>& recovery)
+ const Future<Variable>& recovery)
{
updating = false;
@@ -358,24 +372,38 @@ void RegistrarProcess::_recover(
if (!recovery.isReady()) {
recovered.get()->fail("Failed to recover registrar: " +
(recovery.isFailed() ? recovery.failure() : "discarded"));
- } else {
- Duration elapsed = metrics.state_fetch.stop();
+ return;
+ }
- LOG(INFO) << "Successfully fetched the registry"
- << " (" << Bytes(recovery.get().get().ByteSize()) << ")"
- << " in " << elapsed;
+ // Deserialize the registry.
+ Try<Registry> deserialized =
+ ::protobuf::deserialize<Registry>(recovery->value());
+ if (deserialized.isError()) {
+ recovered.get()->fail("Failed to recover registrar: " +
+ deserialized.error());
+ return;
+ }
- // Save the registry.
- variable = recovery.get();
+ Duration elapsed = metrics.state_fetch.stop();
- // Perform the Recover operation to add the new MasterInfo.
- Owned<Operation> operation(new Recover(info));
- operations.push_back(operation);
- operation->future()
- .onAny(defer(self(), &Self::__recover, lambda::_1));
+ LOG(INFO) << "Successfully fetched the registry"
+ << " (" << Bytes(deserialized->ByteSize()) << ")"
+ << " in " << elapsed;
- update();
- }
+ // Save the registry.
+ variable = recovery.get();
+
+ // Workaround for immovable protobuf messages.
+ registry = Option<Registry>(Registry());
+ registry->Swap(&deserialized.get());
+
+ // Perform the Recover operation to add the new MasterInfo.
+ Owned<Operation> operation(new Recover(info));
+ operations.push_back(operation);
+ operation->future()
+ .onAny(defer(self(), &Self::__recover, lambda::_1));
+
+ update();
}
@@ -397,7 +425,8 @@ void RegistrarProcess::__recover(const Future<bool>& recover)
// the Registry with the latest MasterInfo.
// Set the promise and un-gate any pending operations.
CHECK_SOME(variable);
- recovered.get()->set(variable.get().get());
+ CHECK_SOME(registry);
+ recovered.get()->set(registry.get());
}
}
@@ -446,18 +475,19 @@ void RegistrarProcess::update()
updating = true;
- // Create a snapshot of the current registry.
- Registry registry = variable.get().get();
+ // Create a snapshot of the current registry. We use an `Owned` here
+ // to avoid copying, since protobuf doesn't suppport move construction.
+ auto updatedRegistry = Owned<Registry>(new Registry(registry.get()));
// Create the 'slaveIDs' accumulator.
hashset<SlaveID> slaveIDs;
- foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
+ foreach (const Registry::Slave& slave, updatedRegistry->slaves().slaves()) {
slaveIDs.insert(slave.info().id());
}
foreach (Owned<Operation>& operation, operations) {
// No need to process the result of the operation.
- (*operation)(®istry, &slaveIDs);
+ (*operation)(updatedRegistry.get(), &slaveIDs);
}
LOG(INFO) << "Applied " << operations.size() << " operations in "
@@ -465,14 +495,25 @@ void RegistrarProcess::update()
// Perform the store, and time the operation.
metrics.state_store.start();
- state->store(variable.get().mutate(registry))
+
+ // Serialize updated registry.
+ Try<string> serialized = ::protobuf::serialize(*updatedRegistry);
+ if (serialized.isError()) {
+ string message = "Failed to update registry: " + serialized.error();
+ fail(&operations, message);
+ abort(message);
+ return;
+ }
+
+ state->store(variable->mutate(serialized.get()))
.after(flags.registry_store_timeout,
lambda::bind(
- &timeout<Option<Variable<Registry>>>,
+ &timeout<Option<Variable>>,
"store",
flags.registry_store_timeout,
lambda::_1))
- .onAny(defer(self(), &Self::_update, lambda::_1, operations));
+ .onAny(defer(
+ self(), &Self::_update, lambda::_1, updatedRegistry, operations));
// Clear the operations, _update will transition the Promises!
operations.clear();
@@ -480,7 +521,8 @@ void RegistrarProcess::update()
void RegistrarProcess::_update(
- const Future<Option<Variable<Registry>>>& store,
+ const Future<Option<Variable>>& store,
+ const Owned<Registry>& updatedRegistry,
deque<Owned<Operation>> applied)
{
updating = false;
@@ -508,6 +550,7 @@ void RegistrarProcess::_update(
LOG(INFO) << "Successfully updated the registry in " << elapsed;
variable = store.get().get();
+ registry->Swap(updatedRegistry.get());
// Remove the operations.
while (!applied.empty()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp
index a70132b..c439f6a 100644
--- a/src/master/registrar.hpp
+++ b/src/master/registrar.hpp
@@ -19,7 +19,7 @@
#include <mesos/mesos.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -92,7 +92,7 @@ class Registrar
{
public:
Registrar(const Flags& flags,
- mesos::state::protobuf::State* state,
+ mesos::state::State* state,
const Option<std::string>& authenticationRealm = None());
virtual ~Registrar();
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 02590a2..a4f57e0 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -254,7 +254,7 @@ Try<process::Owned<Master>> Master::start(
}
// Instantiate some other master dependencies.
- master->state.reset(new mesos::state::protobuf::State(master->storage.get()));
+ master->state.reset(new mesos::state::State(master->storage.get()));
master->registrar.reset(new MockRegistrar(
flags, master->state.get(), master::READONLY_HTTP_AUTHENTICATION_REALM));
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 250b12f..6563412 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -34,7 +34,7 @@
#include <mesos/state/in_memory.hpp>
#include <mesos/state/log.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <mesos/state/storage.hpp>
#include <mesos/zookeeper/url.hpp>
@@ -123,7 +123,7 @@ private:
process::Owned<mesos::master::detector::MasterDetector> detector;
process::Owned<mesos::log::Log> log;
process::Owned<mesos::state::Storage> storage;
- process::Owned<mesos::state::protobuf::State> state;
+ process::Owned<mesos::state::State> state;
public:
// Exposed for testing and mocking purposes. We always use a
// `MockRegistrar` in case the test case wants to inspect how the
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_registrar.cpp b/src/tests/mock_registrar.cpp
index 8643e4c..0a877b2 100644
--- a/src/tests/mock_registrar.cpp
+++ b/src/tests/mock_registrar.cpp
@@ -18,7 +18,7 @@
#include <gmock/gmock.h>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -46,7 +46,7 @@ namespace tests {
MockRegistrar::MockRegistrar(
const master::Flags& flags,
- mesos::state::protobuf::State* state,
+ mesos::state::State* state,
const Option<string>& authenticationRealm)
: Registrar(flags, state, authenticationRealm)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_registrar.hpp b/src/tests/mock_registrar.hpp
index cdcc699..92c3994 100644
--- a/src/tests/mock_registrar.hpp
+++ b/src/tests/mock_registrar.hpp
@@ -21,7 +21,7 @@
#include <gmock/gmock.h>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -40,7 +40,7 @@ class MockRegistrar : public mesos::internal::master::Registrar
{
public:
MockRegistrar(const master::Flags& flags,
- mesos::state::protobuf::State* state,
+ mesos::state::State* state,
const Option<std::string>& authenticationRealm = None());
virtual ~MockRegistrar();
http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 5c6fb56..e2c38d3 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -29,7 +29,7 @@
#include <mesos/log/log.hpp>
#include <mesos/state/log.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
#include <mesos/state/storage.hpp>
#include <process/clock.hpp>
@@ -113,8 +113,8 @@ using namespace mesos::internal::master::weights;
using mesos::http::authentication::BasicAuthenticatorFactory;
using mesos::state::LogStorage;
+using mesos::state::State;
using mesos::state::Storage;
-using mesos::state::protobuf::State;
using state::Entry;