You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/24 02:04:39 UTC

git commit: Added metrics to the registrar.

Repository: mesos
Updated Branches:
  refs/heads/master c84cf9b72 -> 85559cc96


Added metrics to the registrar.

Review: https://reviews.apache.org/r/20581


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85559cc9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85559cc9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85559cc9

Branch: refs/heads/master
Commit: 85559cc96fea471ecf9ad0de4dbc19d038854940
Parents: c84cf9b
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Wed Apr 23 17:04:11 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Apr 23 17:04:37 2014 -0700

----------------------------------------------------------------------
 src/master/registrar.cpp | 63 ++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/85559cc9/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 38040bd..ac65f20 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -28,6 +28,10 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+#include <process/metrics/timer.hpp>
+
 #include <stout/lambda.hpp>
 #include <stout/none.hpp>
 #include <stout/nothing.hpp>
@@ -61,6 +65,9 @@ using process::USAGE;
 
 using process::http::OK;
 
+using process::metrics::Gauge;
+using process::metrics::Timer;
+
 using std::deque;
 using std::string;
 
@@ -76,6 +83,7 @@ class RegistrarProcess : public Process<RegistrarProcess>
 public:
   RegistrarProcess(const Flags& _flags, State* _state)
     : ProcessBase(process::ID::generate("registrar")),
+      gauges(*this),
       updating(false),
       flags(_flags),
       state(_state) {}
@@ -118,9 +126,50 @@ private:
     const MasterInfo info;
   };
 
-  Option<Variable<Registry> > variable;
-  deque<Owned<Operation> > operations;
-  bool updating; // Used to signify fetching (recovering) or storing.
+  // Metrics.
+  struct Gauges
+  {
+    explicit Gauges(const RegistrarProcess& process)
+      : queued_operations(
+          "registrar/queued_operations",
+          defer(&process, &RegistrarProcess::_queued_operations))
+    {
+      process::metrics::add(queued_operations);
+    }
+
+    ~Gauges()
+    {
+      process::metrics::remove(queued_operations);
+    }
+
+    Gauge queued_operations;
+  } gauges;
+
+  struct Timers
+  {
+    Timers()
+      : state_fetch("registrar/state_fetch"),
+        state_store("registrar/state_store", Days(1))
+    {
+      process::metrics::add(state_fetch);
+      process::metrics::add(state_store);
+    }
+
+    ~Timers()
+    {
+      process::metrics::remove(state_fetch);
+      process::metrics::remove(state_store);
+    }
+
+    Timer state_fetch;
+    Timer state_store;
+  } timers;
+
+  // Gauge handlers
+  double _queued_operations()
+  {
+    return operations.size();
+  }
 
   // Continuations.
   void _recover(
@@ -135,6 +184,10 @@ private:
       const Future<Option<Variable<Registry> > >& store,
       deque<Owned<Operation> > operations);
 
+  Option<Variable<Registry> > variable;
+  deque<Owned<Operation> > operations;
+  bool updating; // Used to signify fetching (recovering) or storing.
+
   const Flags flags;
   State* state;
 
@@ -218,6 +271,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
 
   if (recovered.isNone()) {
     // TODO(benh): Don't wait forever to recover?
+    timers.state_fetch.start();
     state->fetch<Registry>("registry")
       .onAny(defer(self(), &Self::_recover, info, lambda::_1));
     updating = true;
@@ -233,6 +287,7 @@ void RegistrarProcess::_recover(
     const Future<Variable<Registry> >& recovery)
 {
   updating = false;
+  timers.state_fetch.stop();
 
   CHECK(!recovery.isPending());
 
@@ -332,6 +387,7 @@ void RegistrarProcess::update()
   // TODO(benh): Add a timeout so we don't wait forever.
 
   // Perform the store!
+  timers.state_store.start();
   state->store(variable.get().mutate(registry))
     .onAny(defer(self(), &Self::_update, lambda::_1, operations));
 
@@ -345,6 +401,7 @@ void RegistrarProcess::_update(
     deque<Owned<Operation> > applied)
 {
   updating = false;
+  timers.state_store.stop();
 
   // Set the variable if the storage operation succeeded.
   if (!store.isReady()) {