You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/09 21:38:28 UTC
git commit: Ported some of the slave statistics to the new metrics
library.
Repository: mesos
Updated Branches:
refs/heads/master 56eac2371 -> 5a8eaa2f4
Ported some of the slave statistics to the new metrics library.
Review: https://reviews.apache.org/r/19499
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a8eaa2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a8eaa2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a8eaa2f
Branch: refs/heads/master
Commit: 5a8eaa2f442020e36173ae0d9480a60363fd76ec
Parents: 56eac23
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Fri May 9 11:55:11 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri May 9 12:38:20 2014 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 74 ++++++++++++++++++++++++++++++++++++++++++
src/slave/slave.hpp | 44 ++++++++++++++++++++++++-
src/tests/slave_tests.cpp | 40 +++++++++++++++++++++++
3 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ba4bd73..3a4ae38 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -37,6 +37,8 @@
#include <process/id.hpp>
#include <process/time.hpp>
+#include <process/metrics/metrics.hpp>
+
#include <stout/bytes.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
@@ -108,6 +110,7 @@ Slave::Slave(const slave::Flags& _flags,
detector(_detector),
containerizer(_containerizer),
files(_files),
+ metrics(*this),
monitor(containerizer),
statusUpdateManager(new StatusUpdateManager()),
metaDir(paths::getMetaRootDir(flags.work_dir)),
@@ -1385,6 +1388,7 @@ void Slave::schedulerMessage(
LOG(WARNING) << "Dropping message from framework "<< frameworkId
<< " because the slave is in " << state << " state";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -1394,6 +1398,7 @@ void Slave::schedulerMessage(
LOG(WARNING) << "Dropping message from framework "<< frameworkId
<< " because framework does not exist";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -1405,6 +1410,7 @@ void Slave::schedulerMessage(
LOG(WARNING) << "Dropping message from framework "<< frameworkId
<< " because framework is terminating";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -1414,6 +1420,7 @@ void Slave::schedulerMessage(
<< executorId << "' of framework " << frameworkId
<< " because executor does not exist";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -1429,6 +1436,7 @@ void Slave::schedulerMessage(
<< executorId << "' of framework " << frameworkId
<< " because executor is not running";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
break;
case Executor::RUNNING: {
FrameworkToExecutorMessage message;
@@ -1438,6 +1446,7 @@ void Slave::schedulerMessage(
message.set_data(data);
send(executor->pid, message);
stats.validFrameworkMessages++;
+ metrics.valid_framework_messages++;
break;
}
default:
@@ -1459,6 +1468,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
LOG(WARNING) << "Dropping updateFramework message for "<< frameworkId
<< " because the slave is in " << state << " state";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -1962,6 +1972,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
LOG(WARNING) << "Ignoring status update " << update
<< " for unknown framework " << update.framework_id();
stats.invalidStatusUpdates++;
+ metrics.invalid_status_updates++;
return;
}
@@ -1975,6 +1986,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
LOG(WARNING) << "Ignoring status update " << update
<< " for terminating framework " << framework->id;
stats.invalidStatusUpdates++;
+ metrics.invalid_status_updates++;
return;
}
@@ -1983,6 +1995,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
LOG(WARNING) << "Could not find the executor for "
<< "status update " << update;
stats.validStatusUpdates++;
+ metrics.valid_status_updates++;
// NOTE: We forward the update here because this update could be
// generated by the slave when the executor is unknown to it
@@ -2018,6 +2031,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
stats.tasks[update.status().state()]++;
stats.validStatusUpdates++;
+ metrics.valid_status_updates++;
executor->updateTaskState(status);
@@ -2100,6 +2114,7 @@ void Slave::executorMessage(
<< executorId << " to framework " << frameworkId
<< " because the slave is in " << state << " state";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -2109,6 +2124,7 @@ void Slave::executorMessage(
<< executorId << " to framework " << frameworkId
<< " because framework does not exist";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -2121,6 +2137,7 @@ void Slave::executorMessage(
<< executorId << " to framework " << frameworkId
<< " because framework is terminating";
stats.invalidFrameworkMessages++;
+ metrics.invalid_framework_messages++;
return;
}
@@ -2136,6 +2153,7 @@ void Slave::executorMessage(
send(framework->pid, message);
stats.validFrameworkMessages++;
+ metrics.valid_framework_messages++;
}
@@ -2844,6 +2862,7 @@ Future<Nothing> Slave::recover(const Result<SlaveState>& _state)
info = state.get().info.get(); // Recover the slave info.
recoveryErrors = state.get().errors;
+ metrics.recovery_errors += state.get().errors;
if (recoveryErrors > 0) {
LOG(WARNING) << "Errors encountered during recovery: " << recoveryErrors;
}
@@ -3062,6 +3081,61 @@ Future<Nothing> Slave::garbageCollect(const string& path)
}
+// TODO(dhamon): Consider adding a metrics.cpp for definitions.
+Slave::Metrics::Metrics(const Slave& slave)
+ : uptime_secs(
+ "slave/uptime_secs",
+ defer(slave, &Slave::_uptime_secs)),
+ registered(
+ "slave/registered",
+ defer(slave, &Slave::_registered)),
+ recovery_errors(
+ "slave/recovery_errors"),
+ active_frameworks(
+ "slave/active_frameworks",
+ defer(slave, &Slave::_active_frameworks)),
+ valid_status_updates(
+ "slave/valid_status_updates"),
+ invalid_status_updates(
+ "slave/invalid_status_updates"),
+ valid_framework_messages(
+ "slave/valid_framework_messages"),
+ invalid_framework_messages(
+ "slave/invalid_framework_messages")
+{
+ // TODO(dhamon): Check return values for metric registration.
+ process::metrics::add(uptime_secs);
+ process::metrics::add(registered);
+
+ process::metrics::add(recovery_errors);
+
+ process::metrics::add(active_frameworks);
+
+ process::metrics::add(valid_status_updates);
+ process::metrics::add(invalid_status_updates);
+
+ process::metrics::add(valid_framework_messages);
+ process::metrics::add(invalid_framework_messages);
+}
+
+
+Slave::Metrics::~Metrics()
+{
+ // TODO(dhamon): Check return values of unregistered metrics
+ process::metrics::remove(valid_status_updates);
+ process::metrics::remove(invalid_status_updates);
+
+ process::metrics::remove(valid_framework_messages);
+ process::metrics::remove(invalid_framework_messages);
+
+ process::metrics::remove(active_frameworks);
+ process::metrics::remove(registered);
+ process::metrics::remove(uptime_secs);
+
+ process::metrics::remove(recovery_errors);
+}
+
+
Framework::Framework(
Slave* _slave,
const FrameworkID& _id,
http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ed20dca..a6efad4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -35,6 +35,9 @@
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <process/metrics/counter.hpp>
+#include <process/metrics/gauge.hpp>
+
#include <stout/bytes.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/hashmap.hpp>
@@ -331,6 +334,22 @@ private:
Slave(const Slave&); // No copying.
Slave& operator = (const Slave&); // No assigning.
+ // Gauge methods.
+ double _active_frameworks()
+ {
+ return frameworks.size();
+ }
+
+ double _uptime_secs()
+ {
+ return (Clock::now() - startTime).secs();
+ }
+
+ double _registered()
+ {
+ return master.isSome() ? 1 : 0;
+ }
+
const Flags flags;
SlaveInfo info;
@@ -351,7 +370,8 @@ private:
Files* files;
// Statistics (initialized in Slave::initialize).
- struct {
+ struct
+ {
uint64_t tasks[TaskState_ARRAYSIZE];
uint64_t validStatusUpdates;
uint64_t invalidStatusUpdates;
@@ -359,6 +379,28 @@ private:
uint64_t invalidFrameworkMessages;
} stats;
+ struct Metrics
+ {
+ Metrics(const Slave& slave);
+
+ ~Metrics();
+
+ process::metrics::Gauge uptime_secs;
+ process::metrics::Gauge registered;
+
+ process::metrics::Counter recovery_errors;
+
+ process::metrics::Gauge active_frameworks;
+
+ // TODO(dhamon): Add tasks Gauges.
+
+ process::metrics::Counter valid_status_updates;
+ process::metrics::Counter invalid_status_updates;
+
+ process::metrics::Counter valid_framework_messages;
+ process::metrics::Counter invalid_framework_messages;
+ } metrics;
+
process::Time startTime;
GarbageCollector gc;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 6526952..458356d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -405,3 +405,43 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
+
+
+TEST_F(SlaveTest, MetricsInStatsEndpoint)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ Future<process::http::Response> response =
+ process::http::get(slave.get(), "stats.json");
+
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+
+ ASSERT_SOME(parse);
+
+ JSON::Object stats = parse.get();
+
+ EXPECT_EQ(1u, stats.values.count("slave/uptime_secs"));
+ EXPECT_EQ(1u, stats.values.count("slave/registered"));
+
+ EXPECT_EQ(1u, stats.values.count("slave/recovery_errors"));
+
+ EXPECT_EQ(1u, stats.values.count("slave/active_frameworks"));
+
+ EXPECT_EQ(1u, stats.values.count("slave/valid_status_updates"));
+ EXPECT_EQ(1u, stats.values.count("slave/invalid_status_updates"));
+
+ EXPECT_EQ(1u, stats.values.count("slave/valid_framework_messages"));
+ EXPECT_EQ(1u, stats.values.count("slave/invalid_framework_messages"));
+
+ Shutdown();
+}