You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/04/19 23:44:14 UTC

mesos git commit: Removed unnecessary Registry copying to improve performance.

Repository: mesos
Updated Branches:
  refs/heads/master 99acafa5e -> 3b70d417a


Removed unnecessary Registry copying to improve performance.

When the number of agents is large every `Registry` copy operation takes
a lot of time (~0.4 sec with 55k agents), because it involves deep
copying a big object tree. Because of that, the use of `protobuf::State`
in `Registrar` incurs a dramatic performance cost from multiple protobuf
copying.

This patch drops the use of `protobuf::State` in `Registrar` in favor of
"untyped" `State` and manual serialization/deserialization in order to
minimize `Registry` copying and keep registry update timings at
acceptable values.

Performance improvements to `protobuf::State` should be explored in
order to make it usable in the registrar without regressing on the
performance of this approach.

Review: https://reviews.apache.org/r/58355/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3b70d417
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3b70d417
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3b70d417

Branch: refs/heads/master
Commit: 3b70d417a8642eeb0efb562d45cc0f7a7809f54f
Parents: 99acafa
Author: Ilya Pronin <ip...@twopensource.com>
Authored: Wed Apr 19 16:22:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Apr 19 16:42:51 2017 -0700

----------------------------------------------------------------------
 src/master/main.cpp           |   5 +-
 src/master/registrar.cpp      | 127 +++++++++++++++++++++++++------------
 src/master/registrar.hpp      |   4 +-
 src/tests/cluster.cpp         |   2 +-
 src/tests/cluster.hpp         |   4 +-
 src/tests/mock_registrar.cpp  |   4 +-
 src/tests/mock_registrar.hpp  |   4 +-
 src/tests/registrar_tests.cpp |   4 +-
 8 files changed, 98 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 90d159e..95a482b 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -37,7 +37,7 @@
 #ifndef __WINDOWS__
 #include <mesos/state/log.hpp>
 #endif // __WINDOWS__
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 #include <mesos/state/storage.hpp>
 
 #include <mesos/zookeeper/detector.hpp>
@@ -392,8 +392,7 @@ int main(int argc, char** argv)
 
   CHECK_NOTNULL(storage);
 
-  mesos::state::protobuf::State* state =
-    new mesos::state::protobuf::State(storage);
+  mesos::state::State* state = new mesos::state::State(storage);
   Registrar* registrar =
     new Registrar(flags, state, READONLY_HTTP_AUTHENTICATION_REALM);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 0029cc7..82ec3d9 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -19,7 +19,7 @@
 
 #include <mesos/type_utils.hpp>
 
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
@@ -44,8 +44,8 @@
 #include "master/registrar.hpp"
 #include "master/registry.hpp"
 
-using mesos::state::protobuf::State;
-using mesos::state::protobuf::Variable;
+using mesos::state::State;
+using mesos::state::Variable;
 
 using process::dispatch;
 using process::spawn;
@@ -89,9 +89,9 @@ public:
       const Option<string>& _authenticationRealm)
     : ProcessBase(process::ID::generate("registrar")),
       metrics(*this),
+      state(_state),
       updating(false),
       flags(_flags),
-      state(_state),
       authenticationRealm(_authenticationRealm) {}
 
   virtual ~RegistrarProcess() {}
@@ -108,19 +108,20 @@ protected:
           "/registry",
           authenticationRealm.get(),
           registryHelp(),
-          &RegistrarProcess::registry);
+          &RegistrarProcess::getRegistry);
     } else {
       route(
           "/registry",
           registryHelp(),
-          lambda::bind(&RegistrarProcess::registry, this, lambda::_1, None()));
+          lambda::bind(
+              &RegistrarProcess::getRegistry, this, lambda::_1, None()));
     }
   }
 
 private:
   // HTTP handlers.
   // /registrar(N)/registry
-  Future<Response> registry(
+  Future<Response> getRegistry(
       const Request& request,
       const Option<Principal>&);
   static string registryHelp();
@@ -186,8 +187,8 @@ private:
 
   Future<double> _registry_size_bytes()
   {
-    if (variable.isSome()) {
-      return variable.get().get().ByteSize();
+    if (registry.isSome()) {
+      return registry->ByteSize();
     }
 
     return Failure("Not recovered yet");
@@ -196,14 +197,15 @@ private:
   // Continuations.
   void _recover(
       const MasterInfo& info,
-      const Future<Variable<Registry>>& recovery);
+      const Future<Variable>& recovery);
   void __recover(const Future<bool>& recover);
   Future<bool> _apply(Owned<Operation> operation);
 
   // Helper for updating state (performing store).
   void update();
   void _update(
-      const Future<Option<Variable<Registry>>>& store,
+      const Future<Option<Variable>>& store,
+      const Owned<Registry>& updatedRegistry,
       deque<Owned<Operation>> operations);
 
   // Fails all pending operations and transitions the Registrar
@@ -212,12 +214,24 @@ private:
   // performing more State storage operations.
   void abort(const string& message);
 
-  Option<Variable<Registry>> variable;
+  // TODO(ipronin): We use the "untyped" `State` class here and perform
+  // the protobuf (de)serialization manually within the Registrar, because
+  // the use of `protobuf::State` incurs a dramatic peformance cost from
+  // protobuf copying. We should explore using `protobuf::State`, which will
+  // require move support and other copy elimination to maintain the
+  // performance of the current approach.
+  State* state;
+
+  // Per the TODO above, we store both serialized and deserialized versions
+  // of the `Registry` protobuf. If we're able to move to `protobuf::State`,
+  // we could just store a single `protobuf::state::Variable<Registry>`.
+  Option<Variable> variable;
+  Option<Registry> registry;
+
   deque<Owned<Operation>> operations;
   bool updating; // Used to signify fetching (recovering) or storing.
 
   const Flags flags;
-  State* state;
 
   // Used to compose our operations with recovery.
   Option<Owned<Promise<Registry>>> recovered;
@@ -256,14 +270,14 @@ void fail(deque<Owned<Operation>>* operations, const string& message)
 }
 
 
-Future<Response> RegistrarProcess::registry(
+Future<Response> RegistrarProcess::getRegistry(
     const Request& request,
     const Option<Principal>&)
 {
   JSON::Object result;
 
-  if (variable.isSome()) {
-    result = JSON::protobuf(variable.get().get());
+  if (registry.isSome()) {
+    result = JSON::protobuf(registry.get());
   }
 
   return OK(result, request.url.query.get("jsonp"));
@@ -331,10 +345,10 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
     VLOG(1) << "Recovering registrar";
 
     metrics.state_fetch.start();
-    state->fetch<Registry>("registry")
+    state->fetch("registry")
       .after(flags.registry_fetch_timeout,
              lambda::bind(
-                 &timeout<Variable<Registry>>,
+                 &timeout<Variable>,
                  "fetch",
                  flags.registry_fetch_timeout,
                  lambda::_1))
@@ -349,7 +363,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
 
 void RegistrarProcess::_recover(
     const MasterInfo& info,
-    const Future<Variable<Registry>>& recovery)
+    const Future<Variable>& recovery)
 {
   updating = false;
 
@@ -358,24 +372,38 @@ void RegistrarProcess::_recover(
   if (!recovery.isReady()) {
     recovered.get()->fail("Failed to recover registrar: " +
         (recovery.isFailed() ? recovery.failure() : "discarded"));
-  } else {
-    Duration elapsed = metrics.state_fetch.stop();
+    return;
+  }
 
-    LOG(INFO) << "Successfully fetched the registry"
-              << " (" << Bytes(recovery.get().get().ByteSize()) << ")"
-              << " in " << elapsed;
+  // Deserialize the registry.
+  Try<Registry> deserialized =
+    ::protobuf::deserialize<Registry>(recovery->value());
+  if (deserialized.isError()) {
+    recovered.get()->fail("Failed to recover registrar: " +
+                          deserialized.error());
+    return;
+  }
 
-    // Save the registry.
-    variable = recovery.get();
+  Duration elapsed = metrics.state_fetch.stop();
 
-    // Perform the Recover operation to add the new MasterInfo.
-    Owned<Operation> operation(new Recover(info));
-    operations.push_back(operation);
-    operation->future()
-      .onAny(defer(self(), &Self::__recover, lambda::_1));
+  LOG(INFO) << "Successfully fetched the registry"
+            << " (" << Bytes(deserialized->ByteSize()) << ")"
+            << " in " << elapsed;
 
-    update();
-  }
+  // Save the registry.
+  variable = recovery.get();
+
+  // Workaround for immovable protobuf messages.
+  registry = Option<Registry>(Registry());
+  registry->Swap(&deserialized.get());
+
+  // Perform the Recover operation to add the new MasterInfo.
+  Owned<Operation> operation(new Recover(info));
+  operations.push_back(operation);
+  operation->future()
+    .onAny(defer(self(), &Self::__recover, lambda::_1));
+
+  update();
 }
 
 
@@ -397,7 +425,8 @@ void RegistrarProcess::__recover(const Future<bool>& recover)
     // the Registry with the latest MasterInfo.
     // Set the promise and un-gate any pending operations.
     CHECK_SOME(variable);
-    recovered.get()->set(variable.get().get());
+    CHECK_SOME(registry);
+    recovered.get()->set(registry.get());
   }
 }
 
@@ -446,18 +475,19 @@ void RegistrarProcess::update()
 
   updating = true;
 
-  // Create a snapshot of the current registry.
-  Registry registry = variable.get().get();
+  // Create a snapshot of the current registry. We use an `Owned` here
+  // to avoid copying, since protobuf doesn't suppport move construction.
+  auto updatedRegistry = Owned<Registry>(new Registry(registry.get()));
 
   // Create the 'slaveIDs' accumulator.
   hashset<SlaveID> slaveIDs;
-  foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
+  foreach (const Registry::Slave& slave, updatedRegistry->slaves().slaves()) {
     slaveIDs.insert(slave.info().id());
   }
 
   foreach (Owned<Operation>& operation, operations) {
     // No need to process the result of the operation.
-    (*operation)(&registry, &slaveIDs);
+    (*operation)(updatedRegistry.get(), &slaveIDs);
   }
 
   LOG(INFO) << "Applied " << operations.size() << " operations in "
@@ -465,14 +495,25 @@ void RegistrarProcess::update()
 
   // Perform the store, and time the operation.
   metrics.state_store.start();
-  state->store(variable.get().mutate(registry))
+
+  // Serialize updated registry.
+  Try<string> serialized = ::protobuf::serialize(*updatedRegistry);
+  if (serialized.isError()) {
+    string message = "Failed to update registry: " + serialized.error();
+    fail(&operations, message);
+    abort(message);
+    return;
+  }
+
+  state->store(variable->mutate(serialized.get()))
     .after(flags.registry_store_timeout,
            lambda::bind(
-               &timeout<Option<Variable<Registry>>>,
+               &timeout<Option<Variable>>,
                "store",
                flags.registry_store_timeout,
                lambda::_1))
-    .onAny(defer(self(), &Self::_update, lambda::_1, operations));
+    .onAny(defer(
+        self(), &Self::_update, lambda::_1, updatedRegistry, operations));
 
   // Clear the operations, _update will transition the Promises!
   operations.clear();
@@ -480,7 +521,8 @@ void RegistrarProcess::update()
 
 
 void RegistrarProcess::_update(
-    const Future<Option<Variable<Registry>>>& store,
+    const Future<Option<Variable>>& store,
+    const Owned<Registry>& updatedRegistry,
     deque<Owned<Operation>> applied)
 {
   updating = false;
@@ -508,6 +550,7 @@ void RegistrarProcess::_update(
   LOG(INFO) << "Successfully updated the registry in " << elapsed;
 
   variable = store.get().get();
+  registry->Swap(updatedRegistry.get());
 
   // Remove the operations.
   while (!applied.empty()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp
index a70132b..c439f6a 100644
--- a/src/master/registrar.hpp
+++ b/src/master/registrar.hpp
@@ -19,7 +19,7 @@
 
 #include <mesos/mesos.hpp>
 
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -92,7 +92,7 @@ class Registrar
 {
 public:
   Registrar(const Flags& flags,
-            mesos::state::protobuf::State* state,
+            mesos::state::State* state,
             const Option<std::string>& authenticationRealm = None());
   virtual ~Registrar();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 02590a2..a4f57e0 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -254,7 +254,7 @@ Try<process::Owned<Master>> Master::start(
   }
 
   // Instantiate some other master dependencies.
-  master->state.reset(new mesos::state::protobuf::State(master->storage.get()));
+  master->state.reset(new mesos::state::State(master->storage.get()));
   master->registrar.reset(new MockRegistrar(
       flags, master->state.get(), master::READONLY_HTTP_AUTHENTICATION_REALM));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 250b12f..6563412 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -34,7 +34,7 @@
 
 #include <mesos/state/in_memory.hpp>
 #include <mesos/state/log.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 #include <mesos/state/storage.hpp>
 
 #include <mesos/zookeeper/url.hpp>
@@ -123,7 +123,7 @@ private:
   process::Owned<mesos::master::detector::MasterDetector> detector;
   process::Owned<mesos::log::Log> log;
   process::Owned<mesos::state::Storage> storage;
-  process::Owned<mesos::state::protobuf::State> state;
+  process::Owned<mesos::state::State> state;
 public:
   // Exposed for testing and mocking purposes. We always use a
   // `MockRegistrar` in case the test case wants to inspect how the

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_registrar.cpp b/src/tests/mock_registrar.cpp
index 8643e4c..0a877b2 100644
--- a/src/tests/mock_registrar.cpp
+++ b/src/tests/mock_registrar.cpp
@@ -18,7 +18,7 @@
 
 #include <gmock/gmock.h>
 
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -46,7 +46,7 @@ namespace tests {
 
 MockRegistrar::MockRegistrar(
     const master::Flags& flags,
-    mesos::state::protobuf::State* state,
+    mesos::state::State* state,
     const Option<string>& authenticationRealm)
   : Registrar(flags, state, authenticationRealm)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_registrar.hpp b/src/tests/mock_registrar.hpp
index cdcc699..92c3994 100644
--- a/src/tests/mock_registrar.hpp
+++ b/src/tests/mock_registrar.hpp
@@ -21,7 +21,7 @@
 
 #include <gmock/gmock.h>
 
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -40,7 +40,7 @@ class MockRegistrar : public mesos::internal::master::Registrar
 {
 public:
   MockRegistrar(const master::Flags& flags,
-                mesos::state::protobuf::State* state,
+                mesos::state::State* state,
                 const Option<std::string>& authenticationRealm = None());
 
   virtual ~MockRegistrar();

http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 5c6fb56..e2c38d3 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -29,7 +29,7 @@
 #include <mesos/log/log.hpp>
 
 #include <mesos/state/log.hpp>
-#include <mesos/state/protobuf.hpp>
+#include <mesos/state/state.hpp>
 #include <mesos/state/storage.hpp>
 
 #include <process/clock.hpp>
@@ -113,8 +113,8 @@ using namespace mesos::internal::master::weights;
 using mesos::http::authentication::BasicAuthenticatorFactory;
 
 using mesos::state::LogStorage;
+using mesos::state::State;
 using mesos::state::Storage;
-using mesos::state::protobuf::State;
 
 using state::Entry;