You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/24 21:13:21 UTC

git commit: Added message counters and a counter for slave timeouts during recovery in Master.

Repository: mesos
Updated Branches:
  refs/heads/master dd03b9f4a -> 63595c232


Added message counters and a counter for slave timeouts during recovery in Master.

Review: https://reviews.apache.org/r/20638


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/63595c23
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/63595c23
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/63595c23

Branch: refs/heads/master
Commit: 63595c232a58e97016789df5fcdf4893606d3e04
Parents: dd03b9f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Apr 23 14:58:55 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Apr 24 11:53:08 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp      | 12 ++++++++
 src/master/master.hpp      | 62 ++++++++++++++++++++++++++++++++++++++++-
 src/master/registrar.cpp   | 41 +++++++++++----------------
 src/tests/master_tests.cpp | 21 ++++++++------
 4 files changed, 102 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e66d07e..71d7362 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -741,6 +741,7 @@ void Master::visit(const MessageEvent& event)
   if (!elected()) {
     LOG(WARNING) << "Dropping '" << event.message->name << "' message since "
                  << "not elected yet";
+    ++metrics.dropped_messages;
     return;
   }
 
@@ -754,6 +755,7 @@ void Master::visit(const MessageEvent& event)
   if (!recovered.get().isReady()) {
     LOG(WARNING) << "Dropping '" << event.message->name << "' message since "
                  << "not recovered yet";
+    ++metrics.dropped_messages;
     return;
   }
 
@@ -851,6 +853,8 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
                  << " (" << slave.info().hostname() << ") did not re-register "
                  << "within the timeout; removing it from the registrar";
 
+    ++metrics.recovery_slave_removals;
+
     slaves.recovered.erase(slave.info().id());
 
     if (flags.registry_strict) {
@@ -969,6 +973,8 @@ void Master::registerFramework(
     const UPID& from,
     const FrameworkInfo& frameworkInfo)
 {
+  ++metrics.framework_registration_messages;
+
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up registration request from " << from
               << " because authentication is still in progress";
@@ -1039,6 +1045,8 @@ void Master::reregisterFramework(
     const FrameworkInfo& frameworkInfo,
     bool failover)
 {
+  ++metrics.framework_reregistration_messages;
+
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up re-registration request from " << from
               << " because authentication is still in progress";
@@ -2081,6 +2089,8 @@ void Master::schedulerMessage(
 
 void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
 {
+  ++metrics.slave_registration_messages;
+
   // Check if this slave is already registered (because it retries).
   foreachvalue (Slave* slave, slaves.activated) {
     if (slave->pid == from) {
@@ -2167,6 +2177,8 @@ void Master::reregisterSlave(
     const vector<Task>& tasks,
     const vector<Archive::Framework>& completedFrameworks)
 {
+  ++metrics.slave_reregistration_messages;
+
   if (slaves.deactivated.get(slaveInfo.id()).isSome()) {
     // To compensate for the case where a non-strict registrar is
     // being used, we explicitly deny deactivated slaves from

http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f567a43..7c41de2 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -35,6 +35,9 @@
 #include <process/protobuf.hpp>
 #include <process/timer.hpp>
 
+#include <process/metrics/counter.hpp>
+#include <process/metrics/metrics.hpp>
+
 #include <stout/cache.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
@@ -449,8 +452,10 @@ private:
   int64_t nextOfferId;     // Used to give each slot offer a unique ID.
   int64_t nextSlaveId;     // Used to give each slave a unique ID.
 
+  // TODO(bmahler): These are deprecated! Please use metrics instead.
   // Statistics (initialized in Master::initialize).
-  struct {
+  struct
+  {
     uint64_t tasks[TaskState_ARRAYSIZE];
     uint64_t validStatusUpdates;
     uint64_t invalidStatusUpdates;
@@ -458,6 +463,61 @@ private:
     uint64_t invalidFrameworkMessages;
   } stats;
 
+  struct Metrics
+  {
+    Metrics()
+      : dropped_messages(
+            "master/dropped_messages"),
+        framework_registration_messages(
+            "master/framework_registration_messages"),
+        framework_reregistration_messages(
+            "master/framework_reregistration_messages"),
+        slave_registration_messages(
+            "master/slave_registration_messages"),
+        slave_reregistration_messages(
+            "master/slave_reregistration_messages"),
+        recovery_slave_removals(
+            "master/recovery_slave_removals")
+    {
+      process::metrics::add(dropped_messages);
+
+      process::metrics::add(framework_registration_messages);
+      process::metrics::add(framework_reregistration_messages);
+
+      process::metrics::add(slave_registration_messages);
+      process::metrics::add(slave_reregistration_messages);
+
+      process::metrics::add(recovery_slave_removals);
+    }
+
+    ~Metrics()
+    {
+      process::metrics::remove(dropped_messages);
+
+      process::metrics::remove(framework_registration_messages);
+      process::metrics::remove(framework_reregistration_messages);
+
+      process::metrics::remove(slave_registration_messages);
+      process::metrics::remove(slave_reregistration_messages);
+
+      process::metrics::remove(recovery_slave_removals);
+    }
+
+    // Message counters.
+    // TODO(bmahler): Add counters for other messages: kill task,
+    // status update, etc.
+    process::metrics::Counter dropped_messages;
+
+    process::metrics::Counter framework_registration_messages;
+    process::metrics::Counter framework_reregistration_messages;
+
+    process::metrics::Counter slave_registration_messages;
+    process::metrics::Counter slave_reregistration_messages;
+
+    // Recovery counters.
+    process::metrics::Counter recovery_slave_removals;
+  } metrics;
+
   process::Time startTime; // Start time used to calculate uptime.
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index b3ab572..79d4891 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -83,7 +83,7 @@ class RegistrarProcess : public Process<RegistrarProcess>
 public:
   RegistrarProcess(const Flags& _flags, State* _state)
     : ProcessBase(process::ID::generate("registrar")),
-      gauges(*this),
+      metrics(*this),
       updating(false),
       flags(_flags),
       state(_state) {}
@@ -127,43 +127,34 @@ private:
   };
 
   // Metrics.
-  struct Gauges
+  struct Metrics
   {
-    explicit Gauges(const RegistrarProcess& process)
+    explicit Metrics(const RegistrarProcess& process)
       : queued_operations(
           "registrar/queued_operations",
-          defer(&process, &RegistrarProcess::_queued_operations))
+          defer(process, &RegistrarProcess::_queued_operations)),
+        state_fetch("registrar/state_fetch"),
+        state_store("registrar/state_store", Days(1))
     {
       process::metrics::add(queued_operations);
-    }
-
-    ~Gauges()
-    {
-      process::metrics::remove(queued_operations);
-    }
-
-    Gauge queued_operations;
-  } gauges;
 
-  struct Timers
-  {
-    Timers()
-      : state_fetch("registrar/state_fetch"),
-        state_store("registrar/state_store", Days(1))
-    {
       process::metrics::add(state_fetch);
       process::metrics::add(state_store);
     }
 
-    ~Timers()
+    ~Metrics()
     {
+      process::metrics::remove(queued_operations);
+
       process::metrics::remove(state_fetch);
       process::metrics::remove(state_store);
     }
 
+    Gauge queued_operations;
+
     Timer state_fetch;
     Timer state_store;
-  } timers;
+  } metrics;
 
   // Gauge handlers
   double _queued_operations()
@@ -284,7 +275,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
   LOG(INFO) << "Recovering registrar";
 
   if (recovered.isNone()) {
-    timers.state_fetch.start();
+    metrics.state_fetch.start();
     state->fetch<Registry>("registry")
       .after(flags.registry_fetch_timeout,
              lambda::bind(
@@ -306,7 +297,7 @@ void RegistrarProcess::_recover(
     const Future<Variable<Registry> >& recovery)
 {
   updating = false;
-  timers.state_fetch.stop();
+  metrics.state_fetch.stop();
 
   CHECK(!recovery.isPending());
 
@@ -404,7 +395,7 @@ void RegistrarProcess::update()
   }
 
   // Perform the store!
-  timers.state_store.start();
+  metrics.state_store.start();
   state->store(variable.get().mutate(registry))
     .after(flags.registry_store_timeout,
            lambda::bind(
@@ -424,7 +415,7 @@ void RegistrarProcess::_update(
     deque<Owned<Operation> > applied)
 {
   updating = false;
-  timers.state_store.stop();
+  metrics.state_store.stop();
 
   // Set the variable if the storage operation succeeded.
   if (!store.isReady()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 8371df2..65779a7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1265,12 +1265,6 @@ TEST_F(MasterTest, MetricsInStatsEndpoint)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  process::metrics::Counter counter("master/events");
-
-  AWAIT_READY(process::metrics::add(counter));
-
-  counter += 42;
-
   Future<process::http::Response> response =
     process::http::get(master.get(), "stats.json");
 
@@ -1286,9 +1280,20 @@ TEST_F(MasterTest, MetricsInStatsEndpoint)
 
   JSON::Object stats = parse.get();
 
-  EXPECT_EQ(42, stats.values["master/events"]);
+  EXPECT_EQ(1u, stats.values.count("master/dropped_messages"));
+
+  EXPECT_EQ(1u, stats.values.count("master/framework_registration_messages"));
+  EXPECT_EQ(1u, stats.values.count("master/framework_reregistration_messages"));
+
+  EXPECT_EQ(1u, stats.values.count("master/slave_registration_messages"));
+  EXPECT_EQ(1u, stats.values.count("master/slave_reregistration_messages"));
+
+  EXPECT_EQ(1u, stats.values.count("master/recovery_slave_removals"));
+
+  EXPECT_EQ(1u, stats.values.count("registrar/queued_operations"));
 
-  AWAIT_READY(process::metrics::remove(counter));
+  EXPECT_EQ(1u, stats.values.count("registrar/state_fetch_ms"));
+  EXPECT_EQ(1u, stats.values.count("registrar/state_store_ms"));
 
   Shutdown();
 }