You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/09 21:38:28 UTC

git commit: Ported some of the slave statistics to the new metrics library.

Repository: mesos
Updated Branches:
  refs/heads/master 56eac2371 -> 5a8eaa2f4


Ported some of the slave statistics to the new metrics library.

Review: https://reviews.apache.org/r/19499


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a8eaa2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a8eaa2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a8eaa2f

Branch: refs/heads/master
Commit: 5a8eaa2f442020e36173ae0d9480a60363fd76ec
Parents: 56eac23
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Fri May 9 11:55:11 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri May 9 12:38:20 2014 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 74 ++++++++++++++++++++++++++++++++++++++++++
 src/slave/slave.hpp       | 44 ++++++++++++++++++++++++-
 src/tests/slave_tests.cpp | 40 +++++++++++++++++++++++
 3 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ba4bd73..3a4ae38 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -37,6 +37,8 @@
 #include <process/id.hpp>
 #include <process/time.hpp>
 
+#include <process/metrics/metrics.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/check.hpp>
 #include <stout/duration.hpp>
@@ -108,6 +110,7 @@ Slave::Slave(const slave::Flags& _flags,
     detector(_detector),
     containerizer(_containerizer),
     files(_files),
+    metrics(*this),
     monitor(containerizer),
     statusUpdateManager(new StatusUpdateManager()),
     metaDir(paths::getMetaRootDir(flags.work_dir)),
@@ -1385,6 +1388,7 @@ void Slave::schedulerMessage(
     LOG(WARNING) << "Dropping message from framework "<< frameworkId
                  << " because the slave is in " << state << " state";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -1394,6 +1398,7 @@ void Slave::schedulerMessage(
     LOG(WARNING) << "Dropping message from framework "<< frameworkId
                  << " because framework does not exist";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -1405,6 +1410,7 @@ void Slave::schedulerMessage(
     LOG(WARNING) << "Dropping message from framework "<< frameworkId
                  << " because framework is terminating";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -1414,6 +1420,7 @@ void Slave::schedulerMessage(
                  << executorId << "' of framework " << frameworkId
                  << " because executor does not exist";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -1429,6 +1436,7 @@ void Slave::schedulerMessage(
                    << executorId << "' of framework " << frameworkId
                    << " because executor is not running";
       stats.invalidFrameworkMessages++;
+      metrics.invalid_framework_messages++;
       break;
     case Executor::RUNNING: {
       FrameworkToExecutorMessage message;
@@ -1438,6 +1446,7 @@ void Slave::schedulerMessage(
       message.set_data(data);
       send(executor->pid, message);
       stats.validFrameworkMessages++;
+      metrics.valid_framework_messages++;
       break;
     }
     default:
@@ -1459,6 +1468,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
     LOG(WARNING) << "Dropping updateFramework message for "<< frameworkId
                  << " because the slave is in " << state << " state";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -1962,6 +1972,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
     LOG(WARNING) << "Ignoring status update " << update
                  << " for unknown framework " << update.framework_id();
     stats.invalidStatusUpdates++;
+    metrics.invalid_status_updates++;
     return;
   }
 
@@ -1975,6 +1986,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
     LOG(WARNING) << "Ignoring status update " << update
                  << " for terminating framework " << framework->id;
     stats.invalidStatusUpdates++;
+    metrics.invalid_status_updates++;
     return;
   }
 
@@ -1983,6 +1995,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
     LOG(WARNING)  << "Could not find the executor for "
                   << "status update " << update;
     stats.validStatusUpdates++;
+    metrics.valid_status_updates++;
 
     // NOTE: We forward the update here because this update could be
     // generated by the slave when the executor is unknown to it
@@ -2018,6 +2031,7 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
 
   stats.tasks[update.status().state()]++;
   stats.validStatusUpdates++;
+  metrics.valid_status_updates++;
 
   executor->updateTaskState(status);
 
@@ -2100,6 +2114,7 @@ void Slave::executorMessage(
                  << executorId << " to framework " << frameworkId
                  << " because the slave is in " << state << " state";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -2109,6 +2124,7 @@ void Slave::executorMessage(
                  << executorId << " to framework " << frameworkId
                  << " because framework does not exist";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -2121,6 +2137,7 @@ void Slave::executorMessage(
                  << executorId << " to framework " << frameworkId
                  << " because framework is terminating";
     stats.invalidFrameworkMessages++;
+    metrics.invalid_framework_messages++;
     return;
   }
 
@@ -2136,6 +2153,7 @@ void Slave::executorMessage(
   send(framework->pid, message);
 
   stats.validFrameworkMessages++;
+  metrics.valid_framework_messages++;
 }
 
 
@@ -2844,6 +2862,7 @@ Future<Nothing> Slave::recover(const Result<SlaveState>& _state)
     info = state.get().info.get(); // Recover the slave info.
 
     recoveryErrors = state.get().errors;
+    metrics.recovery_errors += state.get().errors;
     if (recoveryErrors > 0) {
       LOG(WARNING) << "Errors encountered during recovery: " << recoveryErrors;
     }
@@ -3062,6 +3081,61 @@ Future<Nothing> Slave::garbageCollect(const string& path)
 }
 
 
+// TODO(dhamon): Consider adding a metrics.cpp for definitions.
+Slave::Metrics::Metrics(const Slave& slave)
+  : uptime_secs(
+        "slave/uptime_secs",
+        defer(slave, &Slave::_uptime_secs)),
+    registered(
+        "slave/registered",
+        defer(slave, &Slave::_registered)),
+    recovery_errors(
+        "slave/recovery_errors"),
+    active_frameworks(
+        "slave/active_frameworks",
+        defer(slave, &Slave::_active_frameworks)),
+    valid_status_updates(
+        "slave/valid_status_updates"),
+    invalid_status_updates(
+        "slave/invalid_status_updates"),
+    valid_framework_messages(
+        "slave/valid_framework_messages"),
+    invalid_framework_messages(
+        "slave/invalid_framework_messages")
+{
+  // TODO(dhamon): Check return values for metric registration.
+  process::metrics::add(uptime_secs);
+  process::metrics::add(registered);
+
+  process::metrics::add(recovery_errors);
+
+  process::metrics::add(active_frameworks);
+
+  process::metrics::add(valid_status_updates);
+  process::metrics::add(invalid_status_updates);
+
+  process::metrics::add(valid_framework_messages);
+  process::metrics::add(invalid_framework_messages);
+}
+
+
+Slave::Metrics::~Metrics()
+{
+  // TODO(dhamon): Check return values of unregistered metrics
+  process::metrics::remove(valid_status_updates);
+  process::metrics::remove(invalid_status_updates);
+
+  process::metrics::remove(valid_framework_messages);
+  process::metrics::remove(invalid_framework_messages);
+
+  process::metrics::remove(active_frameworks);
+  process::metrics::remove(registered);
+  process::metrics::remove(uptime_secs);
+
+  process::metrics::remove(recovery_errors);
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const FrameworkID& _id,

http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ed20dca..a6efad4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -35,6 +35,9 @@
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
+#include <process/metrics/counter.hpp>
+#include <process/metrics/gauge.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/hashmap.hpp>
@@ -331,6 +334,22 @@ private:
   Slave(const Slave&);              // No copying.
   Slave& operator = (const Slave&); // No assigning.
 
+  // Gauge methods.
+  double _active_frameworks()
+  {
+    return frameworks.size();
+  }
+
+  double _uptime_secs()
+  {
+    return (Clock::now() - startTime).secs();
+  }
+
+  double _registered()
+  {
+    return master.isSome() ? 1 : 0;
+  }
+
   const Flags flags;
 
   SlaveInfo info;
@@ -351,7 +370,8 @@ private:
   Files* files;
 
   // Statistics (initialized in Slave::initialize).
-  struct {
+  struct
+  {
     uint64_t tasks[TaskState_ARRAYSIZE];
     uint64_t validStatusUpdates;
     uint64_t invalidStatusUpdates;
@@ -359,6 +379,28 @@ private:
     uint64_t invalidFrameworkMessages;
   } stats;
 
+  struct Metrics
+  {
+    Metrics(const Slave& slave);
+
+    ~Metrics();
+
+    process::metrics::Gauge uptime_secs;
+    process::metrics::Gauge registered;
+
+    process::metrics::Counter recovery_errors;
+
+    process::metrics::Gauge active_frameworks;
+
+    // TODO(dhamon): Add tasks Gauges.
+
+    process::metrics::Counter valid_status_updates;
+    process::metrics::Counter invalid_status_updates;
+
+    process::metrics::Counter valid_framework_messages;
+    process::metrics::Counter invalid_framework_messages;
+  } metrics;
+
   process::Time startTime;
 
   GarbageCollector gc;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5a8eaa2f/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 6526952..458356d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -405,3 +405,43 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+TEST_F(SlaveTest, MetricsInStatsEndpoint)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  Future<process::http::Response> response =
+    process::http::get(slave.get(), "stats.json");
+
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+
+  ASSERT_SOME(parse);
+
+  JSON::Object stats = parse.get();
+
+  EXPECT_EQ(1u, stats.values.count("slave/uptime_secs"));
+  EXPECT_EQ(1u, stats.values.count("slave/registered"));
+
+  EXPECT_EQ(1u, stats.values.count("slave/recovery_errors"));
+
+  EXPECT_EQ(1u, stats.values.count("slave/active_frameworks"));
+
+  EXPECT_EQ(1u, stats.values.count("slave/valid_status_updates"));
+  EXPECT_EQ(1u, stats.values.count("slave/invalid_status_updates"));
+
+  EXPECT_EQ(1u, stats.values.count("slave/valid_framework_messages"));
+  EXPECT_EQ(1u, stats.values.count("slave/invalid_framework_messages"));
+
+  Shutdown();
+}