You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/24 02:04:39 UTC
git commit: Added metrics to the registrar.
Repository: mesos
Updated Branches:
refs/heads/master c84cf9b72 -> 85559cc96
Added metrics to the registrar.
Review: https://reviews.apache.org/r/20581
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85559cc9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85559cc9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85559cc9
Branch: refs/heads/master
Commit: 85559cc96fea471ecf9ad0de4dbc19d038854940
Parents: c84cf9b
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Wed Apr 23 17:04:11 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Apr 23 17:04:37 2014 -0700
----------------------------------------------------------------------
src/master/registrar.cpp | 63 ++++++++++++++++++++++++++++++++++++++++---
1 file changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/85559cc9/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 38040bd..ac65f20 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -28,6 +28,10 @@
#include <process/owned.hpp>
#include <process/process.hpp>
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+#include <process/metrics/timer.hpp>
+
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
@@ -61,6 +65,9 @@ using process::USAGE;
using process::http::OK;
+using process::metrics::Gauge;
+using process::metrics::Timer;
+
using std::deque;
using std::string;
@@ -76,6 +83,7 @@ class RegistrarProcess : public Process<RegistrarProcess>
public:
RegistrarProcess(const Flags& _flags, State* _state)
: ProcessBase(process::ID::generate("registrar")),
+ gauges(*this),
updating(false),
flags(_flags),
state(_state) {}
@@ -118,9 +126,50 @@ private:
const MasterInfo info;
};
- Option<Variable<Registry> > variable;
- deque<Owned<Operation> > operations;
- bool updating; // Used to signify fetching (recovering) or storing.
+ // Metrics.
+ struct Gauges
+ {
+ explicit Gauges(const RegistrarProcess& process)
+ : queued_operations(
+ "registrar/queued_operations",
+ defer(&process, &RegistrarProcess::_queued_operations))
+ {
+ process::metrics::add(queued_operations);
+ }
+
+ ~Gauges()
+ {
+ process::metrics::remove(queued_operations);
+ }
+
+ Gauge queued_operations;
+ } gauges;
+
+ struct Timers
+ {
+ Timers()
+ : state_fetch("registrar/state_fetch"),
+ state_store("registrar/state_store", Days(1))
+ {
+ process::metrics::add(state_fetch);
+ process::metrics::add(state_store);
+ }
+
+ ~Timers()
+ {
+ process::metrics::remove(state_fetch);
+ process::metrics::remove(state_store);
+ }
+
+ Timer state_fetch;
+ Timer state_store;
+ } timers;
+
+ // Gauge handlers
+ double _queued_operations()
+ {
+ return operations.size();
+ }
// Continuations.
void _recover(
@@ -135,6 +184,10 @@ private:
const Future<Option<Variable<Registry> > >& store,
deque<Owned<Operation> > operations);
+ Option<Variable<Registry> > variable;
+ deque<Owned<Operation> > operations;
+ bool updating; // Used to signify fetching (recovering) or storing.
+
const Flags flags;
State* state;
@@ -218,6 +271,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
if (recovered.isNone()) {
// TODO(benh): Don't wait forever to recover?
+ timers.state_fetch.start();
state->fetch<Registry>("registry")
.onAny(defer(self(), &Self::_recover, info, lambda::_1));
updating = true;
@@ -233,6 +287,7 @@ void RegistrarProcess::_recover(
const Future<Variable<Registry> >& recovery)
{
updating = false;
+ timers.state_fetch.stop();
CHECK(!recovery.isPending());
@@ -332,6 +387,7 @@ void RegistrarProcess::update()
// TODO(benh): Add a timeout so we don't wait forever.
// Perform the store!
+ timers.state_store.start();
state->store(variable.get().mutate(registry))
.onAny(defer(self(), &Self::_update, lambda::_1, operations));
@@ -345,6 +401,7 @@ void RegistrarProcess::_update(
deque<Owned<Operation> > applied)
{
updating = false;
+ timers.state_store.stop();
// Set the variable if the storage operation succeeded.
if (!store.isReady()) {