You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/24 21:13:21 UTC
git commit: Added message counters and a counter for slave timeouts
during recovery in Master.
Repository: mesos
Updated Branches:
refs/heads/master dd03b9f4a -> 63595c232
Added message counters and a counter for slave timeouts during recovery in Master.
Review: https://reviews.apache.org/r/20638
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/63595c23
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/63595c23
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/63595c23
Branch: refs/heads/master
Commit: 63595c232a58e97016789df5fcdf4893606d3e04
Parents: dd03b9f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Apr 23 14:58:55 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Apr 24 11:53:08 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 12 ++++++++
src/master/master.hpp | 62 ++++++++++++++++++++++++++++++++++++++++-
src/master/registrar.cpp | 41 +++++++++++----------------
src/tests/master_tests.cpp | 21 ++++++++------
4 files changed, 102 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e66d07e..71d7362 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -741,6 +741,7 @@ void Master::visit(const MessageEvent& event)
if (!elected()) {
LOG(WARNING) << "Dropping '" << event.message->name << "' message since "
<< "not elected yet";
+ ++metrics.dropped_messages;
return;
}
@@ -754,6 +755,7 @@ void Master::visit(const MessageEvent& event)
if (!recovered.get().isReady()) {
LOG(WARNING) << "Dropping '" << event.message->name << "' message since "
<< "not recovered yet";
+ ++metrics.dropped_messages;
return;
}
@@ -851,6 +853,8 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
<< " (" << slave.info().hostname() << ") did not re-register "
<< "within the timeout; removing it from the registrar";
+ ++metrics.recovery_slave_removals;
+
slaves.recovered.erase(slave.info().id());
if (flags.registry_strict) {
@@ -969,6 +973,8 @@ void Master::registerFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo)
{
+ ++metrics.framework_registration_messages;
+
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up registration request from " << from
<< " because authentication is still in progress";
@@ -1039,6 +1045,8 @@ void Master::reregisterFramework(
const FrameworkInfo& frameworkInfo,
bool failover)
{
+ ++metrics.framework_reregistration_messages;
+
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up re-registration request from " << from
<< " because authentication is still in progress";
@@ -2081,6 +2089,8 @@ void Master::schedulerMessage(
void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
{
+ ++metrics.slave_registration_messages;
+
// Check if this slave is already registered (because it retries).
foreachvalue (Slave* slave, slaves.activated) {
if (slave->pid == from) {
@@ -2167,6 +2177,8 @@ void Master::reregisterSlave(
const vector<Task>& tasks,
const vector<Archive::Framework>& completedFrameworks)
{
+ ++metrics.slave_reregistration_messages;
+
if (slaves.deactivated.get(slaveInfo.id()).isSome()) {
// To compensate for the case where a non-strict registrar is
// being used, we explicitly deny deactivated slaves from
http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f567a43..7c41de2 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -35,6 +35,9 @@
#include <process/protobuf.hpp>
#include <process/timer.hpp>
+#include <process/metrics/counter.hpp>
+#include <process/metrics/metrics.hpp>
+
#include <stout/cache.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
@@ -449,8 +452,10 @@ private:
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
+ // TODO(bmahler): These are deprecated! Please use metrics instead.
// Statistics (initialized in Master::initialize).
- struct {
+ struct
+ {
uint64_t tasks[TaskState_ARRAYSIZE];
uint64_t validStatusUpdates;
uint64_t invalidStatusUpdates;
@@ -458,6 +463,61 @@ private:
uint64_t invalidFrameworkMessages;
} stats;
+ struct Metrics
+ {
+ Metrics()
+ : dropped_messages(
+ "master/dropped_messages"),
+ framework_registration_messages(
+ "master/framework_registration_messages"),
+ framework_reregistration_messages(
+ "master/framework_reregistration_messages"),
+ slave_registration_messages(
+ "master/slave_registration_messages"),
+ slave_reregistration_messages(
+ "master/slave_reregistration_messages"),
+ recovery_slave_removals(
+ "master/recovery_slave_removals")
+ {
+ process::metrics::add(dropped_messages);
+
+ process::metrics::add(framework_registration_messages);
+ process::metrics::add(framework_reregistration_messages);
+
+ process::metrics::add(slave_registration_messages);
+ process::metrics::add(slave_reregistration_messages);
+
+ process::metrics::add(recovery_slave_removals);
+ }
+
+ ~Metrics()
+ {
+ process::metrics::remove(dropped_messages);
+
+ process::metrics::remove(framework_registration_messages);
+ process::metrics::remove(framework_reregistration_messages);
+
+ process::metrics::remove(slave_registration_messages);
+ process::metrics::remove(slave_reregistration_messages);
+
+ process::metrics::remove(recovery_slave_removals);
+ }
+
+ // Message counters.
+ // TODO(bmahler): Add counters for other messages: kill task,
+ // status update, etc.
+ process::metrics::Counter dropped_messages;
+
+ process::metrics::Counter framework_registration_messages;
+ process::metrics::Counter framework_reregistration_messages;
+
+ process::metrics::Counter slave_registration_messages;
+ process::metrics::Counter slave_reregistration_messages;
+
+ // Recovery counters.
+ process::metrics::Counter recovery_slave_removals;
+ } metrics;
+
process::Time startTime; // Start time used to calculate uptime.
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index b3ab572..79d4891 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -83,7 +83,7 @@ class RegistrarProcess : public Process<RegistrarProcess>
public:
RegistrarProcess(const Flags& _flags, State* _state)
: ProcessBase(process::ID::generate("registrar")),
- gauges(*this),
+ metrics(*this),
updating(false),
flags(_flags),
state(_state) {}
@@ -127,43 +127,34 @@ private:
};
// Metrics.
- struct Gauges
+ struct Metrics
{
- explicit Gauges(const RegistrarProcess& process)
+ explicit Metrics(const RegistrarProcess& process)
: queued_operations(
"registrar/queued_operations",
- defer(&process, &RegistrarProcess::_queued_operations))
+ defer(process, &RegistrarProcess::_queued_operations)),
+ state_fetch("registrar/state_fetch"),
+ state_store("registrar/state_store", Days(1))
{
process::metrics::add(queued_operations);
- }
-
- ~Gauges()
- {
- process::metrics::remove(queued_operations);
- }
-
- Gauge queued_operations;
- } gauges;
- struct Timers
- {
- Timers()
- : state_fetch("registrar/state_fetch"),
- state_store("registrar/state_store", Days(1))
- {
process::metrics::add(state_fetch);
process::metrics::add(state_store);
}
- ~Timers()
+ ~Metrics()
{
+ process::metrics::remove(queued_operations);
+
process::metrics::remove(state_fetch);
process::metrics::remove(state_store);
}
+ Gauge queued_operations;
+
Timer state_fetch;
Timer state_store;
- } timers;
+ } metrics;
// Gauge handlers
double _queued_operations()
@@ -284,7 +275,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
LOG(INFO) << "Recovering registrar";
if (recovered.isNone()) {
- timers.state_fetch.start();
+ metrics.state_fetch.start();
state->fetch<Registry>("registry")
.after(flags.registry_fetch_timeout,
lambda::bind(
@@ -306,7 +297,7 @@ void RegistrarProcess::_recover(
const Future<Variable<Registry> >& recovery)
{
updating = false;
- timers.state_fetch.stop();
+ metrics.state_fetch.stop();
CHECK(!recovery.isPending());
@@ -404,7 +395,7 @@ void RegistrarProcess::update()
}
// Perform the store!
- timers.state_store.start();
+ metrics.state_store.start();
state->store(variable.get().mutate(registry))
.after(flags.registry_store_timeout,
lambda::bind(
@@ -424,7 +415,7 @@ void RegistrarProcess::_update(
deque<Owned<Operation> > applied)
{
updating = false;
- timers.state_store.stop();
+ metrics.state_store.stop();
// Set the variable if the storage operation succeeded.
if (!store.isReady()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/63595c23/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 8371df2..65779a7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1265,12 +1265,6 @@ TEST_F(MasterTest, MetricsInStatsEndpoint)
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- process::metrics::Counter counter("master/events");
-
- AWAIT_READY(process::metrics::add(counter));
-
- counter += 42;
-
Future<process::http::Response> response =
process::http::get(master.get(), "stats.json");
@@ -1286,9 +1280,20 @@ TEST_F(MasterTest, MetricsInStatsEndpoint)
JSON::Object stats = parse.get();
- EXPECT_EQ(42, stats.values["master/events"]);
+ EXPECT_EQ(1u, stats.values.count("master/dropped_messages"));
+
+ EXPECT_EQ(1u, stats.values.count("master/framework_registration_messages"));
+ EXPECT_EQ(1u, stats.values.count("master/framework_reregistration_messages"));
+
+ EXPECT_EQ(1u, stats.values.count("master/slave_registration_messages"));
+ EXPECT_EQ(1u, stats.values.count("master/slave_reregistration_messages"));
+
+ EXPECT_EQ(1u, stats.values.count("master/recovery_slave_removals"));
+
+ EXPECT_EQ(1u, stats.values.count("registrar/queued_operations"));
- AWAIT_READY(process::metrics::remove(counter));
+ EXPECT_EQ(1u, stats.values.count("registrar/state_fetch_ms"));
+ EXPECT_EQ(1u, stats.values.count("registrar/state_store_ms"));
Shutdown();
}