You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/02/05 01:28:06 UTC

[1/3] mesos git commit: Moved framework related rate limiters into Master::Frameworks.

Repository: mesos
Updated Branches:
  refs/heads/master b05e2f0bb -> 3048e5e16


Moved framework related rate limiters into Master::Frameworks.

Review: https://reviews.apache.org/r/30511


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fafccbd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fafccbd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fafccbd9

Branch: refs/heads/master
Commit: fafccbd9dcb92d4db27bc272d7443ac6ebecbae8
Parents: b05e2f0
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 12:44:53 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:48 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 71 ++++++++++++++++++++++++++++++----------------
 src/master/master.hpp | 35 +++++++----------------
 2 files changed, 57 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 46caa55..d5c4beb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -257,6 +257,25 @@ Master::Master(
 Master::~Master() {}
 
 
+// TODO(vinod): Update this interface to return failed futures when
+// capacity is reached.
+struct BoundedRateLimiter
+{
+  BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+    : limiter(new process::RateLimiter(qps)),
+      capacity(_capacity),
+      messages(0) {}
+
+  process::Owned<process::RateLimiter> limiter;
+  const Option<uint64_t> capacity;
+
+  // Number of outstanding messages for this RateLimiter.
+  // NOTE: ExitedEvents are throttled but not counted towards
+  // the capacity here.
+  uint64_t messages;
+};
+
+
 void Master::initialize()
 {
   LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
@@ -360,7 +379,7 @@ void Master::initialize()
   if (flags.rate_limits.isSome()) {
     // Add framework rate limiters.
     foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
-      if (limiters.contains(limit_.principal())) {
+      if (frameworks.limiters.contains(limit_.principal())) {
         EXIT(1) << "Duplicate principal " << limit_.principal()
                 << " found in RateLimits configuration";
       }
@@ -375,12 +394,12 @@ void Master::initialize()
         if (limit_.has_capacity()) {
           capacity = limit_.capacity();
         }
-        limiters.put(
+        frameworks.limiters.put(
             limit_.principal(),
             Owned<BoundedRateLimiter>(
                 new BoundedRateLimiter(limit_.qps(), capacity)));
       } else {
-        limiters.put(limit_.principal(), None());
+        frameworks.limiters.put(limit_.principal(), None());
       }
     }
 
@@ -396,7 +415,7 @@ void Master::initialize()
       if (flags.rate_limits.get().has_aggregate_default_capacity()) {
         capacity = flags.rate_limits.get().aggregate_default_capacity();
       }
-      defaultLimiter = Owned<BoundedRateLimiter>(
+      frameworks.defaultLimiter = Owned<BoundedRateLimiter>(
           new BoundedRateLimiter(
               flags.rate_limits.get().aggregate_default_qps(), capacity));
     }
@@ -905,9 +924,10 @@ void Master::visit(const MessageEvent& event)
   //    above. (or)
   // 2) the principal exists in RateLimits but 'qps' is not set.
   if (principal.isSome() &&
-      limiters.contains(principal.get()) &&
-      limiters[principal.get()].isSome()) {
-    const Owned<BoundedRateLimiter>& limiter = limiters[principal.get()].get();
+      frameworks.limiters.contains(principal.get()) &&
+      frameworks.limiters[principal.get()].isSome()) {
+    const Owned<BoundedRateLimiter>& limiter =
+      frameworks.limiters[principal.get()].get();
 
     if (limiter->capacity.isNone() ||
         limiter->messages < limiter->capacity.get()) {
@@ -920,19 +940,21 @@ void Master::visit(const MessageEvent& event)
           principal,
           limiter->capacity.get());
     }
-  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+  } else if ((principal.isNone() ||
+              !frameworks.limiters.contains(principal.get())) &&
              isRegisteredFramework &&
-             defaultLimiter.isSome()) {
-    if (defaultLimiter.get()->capacity.isNone() ||
-        defaultLimiter.get()->messages < defaultLimiter.get()->capacity.get()) {
-      defaultLimiter.get()->messages++;
-      defaultLimiter.get()->limiter->acquire()
+             frameworks.defaultLimiter.isSome()) {
+    if (frameworks.defaultLimiter.get()->capacity.isNone() ||
+        frameworks.defaultLimiter.get()->messages <
+          frameworks.defaultLimiter.get()->capacity.get()) {
+      frameworks.defaultLimiter.get()->messages++;
+      frameworks.defaultLimiter.get()->limiter->acquire()
         .onReady(defer(self(), &Self::throttled, event, None()));
     } else {
       exceededCapacity(
           event,
           principal,
-          defaultLimiter.get()->capacity.get());
+          frameworks.defaultLimiter.get()->capacity.get());
     }
   } else {
     _visit(event);
@@ -957,14 +979,15 @@ void Master::visit(const ExitedEvent& event)
   typedef void(Self::*F)(const ExitedEvent&);
 
   if (principal.isSome() &&
-      limiters.contains(principal.get()) &&
-      limiters[principal.get()].isSome()) {
-    limiters[principal.get()].get()->limiter->acquire()
+      frameworks.limiters.contains(principal.get()) &&
+      frameworks.limiters[principal.get()].isSome()) {
+    frameworks.limiters[principal.get()].get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
-  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+  } else if ((principal.isNone() ||
+              !frameworks.limiters.contains(principal.get())) &&
              isRegisteredFramework &&
-             defaultLimiter.isSome()) {
-    defaultLimiter.get()->limiter->acquire()
+             frameworks.defaultLimiter.isSome()) {
+    frameworks.defaultLimiter.get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else {
     _visit(event);
@@ -979,11 +1002,11 @@ void Master::throttled(
   // We already know a RateLimiter is used to throttle this event so
   // here we only need to determine which.
   if (principal.isSome()) {
-    CHECK_SOME(limiters[principal.get()]);
-    limiters[principal.get()].get()->messages--;
+    CHECK_SOME(frameworks.limiters[principal.get()]);
+    frameworks.limiters[principal.get()].get()->messages--;
   } else {
-    CHECK_SOME(defaultLimiter);
-    defaultLimiter.get()->messages--;
+    CHECK_SOME(frameworks.defaultLimiter);
+    frameworks.defaultLimiter.get()->messages--;
   }
 
   _visit(event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9d8c508..d98e1b6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,6 +86,7 @@ class Allocator;
 class Repairer;
 class SlaveObserver;
 
+struct BoundedRateLimiter;
 struct Framework;
 struct Role;
 struct Slave;
@@ -615,6 +616,15 @@ private:
     //    allows them) if they have principals specified in
     //    FrameworkInfo.
     hashmap<process::UPID, Option<std::string>> principals;
+
+    // BoundedRateLimiters keyed by the framework principal.
+    // Like Metrics::Frameworks, all frameworks of the same principal
+    // are throttled together at a common rate limit.
+    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+
+    // The default limiter is for frameworks not specified in
+    // 'flags.rate_limits'.
+    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
   } frameworks;
 
   hashmap<OfferID, Offer*> offers;
@@ -712,31 +722,6 @@ private:
   process::Future<Option<Error>> validate(
       const FrameworkInfo& frameworkInfo,
       const process::UPID& from);
-
-  struct BoundedRateLimiter
-  {
-    BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
-      : limiter(new process::RateLimiter(qps)),
-        capacity(_capacity),
-        messages(0) {}
-
-    process::Owned<process::RateLimiter> limiter;
-    const Option<uint64_t> capacity;
-
-    // Number of outstanding messages for this RateLimiter.
-    // NOTE: ExitedEvents are throttled but not counted towards
-    // the capacity here.
-    uint64_t messages;
-  };
-
-  // BoundedRateLimiters keyed by the framework principal.
-  // Like Metrics::Frameworks, all frameworks of the same principal
-  // are throttled together at a common rate limit.
-  hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
-
-  // The default limiter is for frameworks not specified in
-  // 'flags.rate_limits'.
-  Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
 };
 
 


[2/3] mesos git commit: Rate limited the removal of slaves failing health checks.

Posted by vi...@apache.org.
Rate limited the removal of slaves failing health checks.

Review: https://reviews.apache.org/r/30514


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/886efefc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/886efefc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/886efefc

Branch: refs/heads/master
Commit: 886efefc5f294b3ea22c1fa2ce70a9e9324eca19
Parents: fafccbd
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 11:51:02 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800

----------------------------------------------------------------------
 src/master/flags.hpp          |  10 ++
 src/master/master.cpp         | 104 +++++++++++++++++--
 src/master/master.hpp         |   6 ++
 src/tests/partition_tests.cpp |   2 +-
 src/tests/slave_tests.cpp     | 204 +++++++++++++++++++++++++++++++------
 5 files changed, 287 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 6c18a1a..e9f6fff 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -147,6 +147,15 @@ public:
         "Values: [0%-100%]",
         stringify(RECOVERY_SLAVE_REMOVAL_PERCENT_LIMIT * 100.0) + "%");
 
+    // TODO(vinod): Add a 'Rate' abstraction in stout and the
+    // corresponding parser for flags.
+    add(&Flags::slave_removal_rate_limit,
+        "slave_removal_rate_limit",
+        "The maximum rate (e.g., 1/10mins, 2/3hrs, etc) at which slaves will\n"
+        "be removed from the master when they fail health checks. By default\n"
+        "slaves will be removed as soon as they fail the health checks.\n"
+        "The value is of the form <Number of slaves>/<Duration>.");
+
     add(&Flags::webui_dir,
         "webui_dir",
         "Directory path of the webui files/assets",
@@ -377,6 +386,7 @@ public:
   bool log_auto_initialize;
   Duration slave_reregister_timeout;
   std::string recovery_slave_removal_limit;
+  Option<std::string> slave_removal_rate_limit;
   std::string webui_dir;
   std::string whitelist;
   std::string user_sorter;

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d5c4beb..e42b922 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -94,6 +94,7 @@ using process::Owned;
 using process::PID;
 using process::Process;
 using process::Promise;
+using process::RateLimiter;
 using process::Shared;
 using process::Time;
 using process::Timer;
@@ -116,12 +117,14 @@ public:
   SlaveObserver(const UPID& _slave,
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
-                const PID<Master>& _master)
+                const PID<Master>& _master,
+                const Option<shared_ptr<RateLimiter>>& _limiter)
     : ProcessBase(process::ID::generate("slave-observer")),
       slave(_slave),
       slaveInfo(_slaveInfo),
       slaveId(_slaveId),
       master(_master),
+      limiter(_limiter),
       timeouts(0),
       pinged(false),
       connected(true)
@@ -167,23 +170,75 @@ protected:
   {
     timeouts = 0;
     pinged = false;
+
+    // Cancel any pending shutdown.
+    if (shuttingDown.isSome()) {
+      // Need a copy for non-const access.
+      Future<Nothing> future = shuttingDown.get();
+      future.discard();
+    }
   }
 
   void timeout()
   {
-    if (pinged) { // So we haven't got back a pong yet ...
-      if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+    if (pinged) {
+      timeouts++; // No pong has been received before the timeout.
+      if (timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+        // No pong has been received for the last
+        // 'MAX_SLAVE_PING_TIMEOUTS' pings.
         shutdown();
-        return;
       }
     }
 
+    // NOTE: We keep pinging even if we schedule a shutdown. This is
+    // because if the slave eventually responds to a ping, we can
+    // cancel the shutdown.
     ping();
   }
 
+  // NOTE: The shutdown of the slave is rate limited and can be
+  // canceled if a pong was received before the actual shutdown is
+  // called.
   void shutdown()
   {
-    dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out");
+    if (shuttingDown.isSome()) {
+      return;  // Shutdown is already in progress.
+    }
+
+    Future<Nothing> acquire = Nothing();
+
+    if (limiter.isSome()) {
+      LOG(INFO) << "Scheduling shutdown of slave " << slaveId
+                << " due to health check timeout";
+
+      acquire = limiter.get()->acquire();
+    }
+
+    shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+  }
+
+  void _shutdown()
+  {
+    CHECK_SOME(shuttingDown);
+
+    const Future<Nothing>& future = shuttingDown.get();
+
+    CHECK(!future.isFailed());
+
+    if (future.isReady()) {
+      LOG(INFO) << "Shutting down slave " << slaveId
+                << " due to health check timeout";
+
+      dispatch(master,
+               &Master::shutdownSlave,
+               slaveId,
+               "health check timed out");
+    } else if (future.isDiscarded()) {
+      LOG(INFO) << "Canceling shutdown of slave " << slaveId
+                << " since a pong is received!";
+    }
+
+    shuttingDown = None();
   }
 
 private:
@@ -191,6 +246,8 @@ private:
   const SlaveInfo slaveInfo;
   const SlaveID slaveId;
   const PID<Master> master;
+  const Option<shared_ptr<RateLimiter>> limiter;
+  Option<Future<Nothing>> shuttingDown;
   uint32_t timeouts;
   bool pinged;
   bool connected;
@@ -423,6 +480,41 @@ void Master::initialize()
     LOG(INFO) << "Framework rate limiting enabled";
   }
 
+  if (flags.slave_removal_rate_limit.isSome()) {
+    LOG(INFO) << "Slave removal is rate limited to "
+              << flags.slave_removal_rate_limit.get();
+
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    vector<string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    slaves.limiter = new RateLimiter(permits.get(), duration.get());
+  }
+
   hashmap<string, RoleInfo> roleInfos;
 
   // Add the default role.
@@ -4229,7 +4321,7 @@ void Master::addSlave(
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
-      slave->pid, slave->info, slave->id, self());
+      slave->pid, slave->info, slave->id, self(), slaves.limiter);
 
   spawn(slave->observer);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d98e1b6..5a5c86f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -585,6 +585,12 @@ private:
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     Cache<SlaveID, Nothing> removed;
 
+    // This rate limiter is used to limit the removal of slaves failing
+    // health checks.
+    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+    // a wrapper around libprocess process which is thread safe.
+    Option<memory::shared_ptr<process::RateLimiter>> limiter;
+
     bool transitioning(const Option<SlaveID>& slaveId)
     {
       if (slaveId.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index fea7801..b3af282 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -64,7 +64,7 @@ class PartitionTest : public MesosTest {};
 
 
 // This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
+// message for a partitioned slave.
 TEST_F(PartitionTest, PartitionedSlave)
 {
   Try<PID<Master> > master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e7e2af6..956ce64 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -64,6 +64,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using namespace process;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Containerizer;
@@ -73,12 +75,6 @@ using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
 using mesos::internal::slave::Slave;
 
-using process::Clock;
-using process::Future;
-using process::Message;
-using process::Owned;
-using process::PID;
-
 using std::map;
 using std::string;
 using std::vector;
@@ -151,7 +147,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
   tasks.push_back(task);
 
   // Drop the registration message from the executor to the slave.
-  Future<process::Message> registerExecutor =
+  Future<Message> registerExecutor =
     DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
 
   driver.launchTasks(offers.get()[0].id(), tasks);
@@ -234,7 +230,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
   tasks.push_back(task);
 
   // Drop the registration message from the executor to the slave.
-  Future<process::Message> registerExecutorMessage =
+  Future<Message> registerExecutorMessage =
     DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
 
   driver.launchTasks(offers.get()[0].id(), tasks);
@@ -322,7 +318,7 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
   // Expect wait after launch is called but don't return anything
   // until after we've finished everything below.
   Future<Nothing> wait;
-  process::Promise<containerizer::Termination> promise;
+  Promise<containerizer::Termination> promise;
   EXPECT_CALL(containerizer, wait(_))
     .WillOnce(DoAll(FutureSatisfy(&wait),
                     Return(promise.future())));
@@ -359,12 +355,12 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  Try<process::Subprocess> executor =
-    process::subprocess(
+  Try<Subprocess> executor =
+    subprocess(
         executorCommand,
-        process::Subprocess::PIPE(),
-        process::Subprocess::PIPE(),
-        process::Subprocess::PIPE(),
+        Subprocess::PIPE(),
+        Subprocess::PIPE(),
+        Subprocess::PIPE(),
         environment);
 
   ASSERT_SOME(executor);
@@ -728,13 +724,13 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // We need to grab this message to get the scheduler's pid.
-  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+  Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
       Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
   schedDriver.start();
 
   AWAIT_READY(frameworkRegisteredMessage);
-  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+  const UPID schedulerPid = frameworkRegisteredMessage.get().to;
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
@@ -776,10 +772,9 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
   AWAIT_READY(acknowledgementMessage);
 
   // Send the acknowledgement to the slave with a non-leading master.
-  process::post(
-      process::UPID("master@localhost:1"),
-      slave.get(),
-      acknowledgementMessage.get());
+  post(process::UPID("master@localhost:1"),
+       slave.get(),
+       acknowledgementMessage.get());
 
   // Make sure the acknowledgement was ignored.
   Clock::settle();
@@ -821,8 +816,8 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint)
   Try<PID<Slave> > slave = StartSlave();
   ASSERT_SOME(slave);
 
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "stats.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "stats.json");
 
   AWAIT_READY(response);
 
@@ -891,8 +886,8 @@ TEST_F(SlaveTest, StateEndpoint)
   Try<PID<Slave> > slave = StartSlave(flags);
   ASSERT_SOME(slave);
 
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "state.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "state.json");
 
   AWAIT_READY(response);
 
@@ -1009,7 +1004,7 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
 
   // Send a ShutdownMessage instead of calling Stop() directly
   // to avoid blocking.
-  process::post(master.get(), slave.get(), ShutdownMessage());
+  post(master.get(), slave.get(), ShutdownMessage());
 
   // Advance the clock to trigger doReliableRegistration().
   Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2);
@@ -1096,7 +1091,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
 
   // Set up the containerizer so the next update() will fail.
   EXPECT_CALL(containerizer, update(_, _))
-    .WillOnce(Return(process::Failure("update() failed")))
+    .WillOnce(Return(Failure("update() failed")))
     .WillRepeatedly(Return(Nothing()));
 
   EXPECT_CALL(exec, killTask(_, _))
@@ -1201,6 +1196,151 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
 }
 
 
+// This test ensures that when slave removal rate limit is specified
+// a slave that fails health checks is removed after acquiring permit
+// from the rate limiter.
+TEST_F(SlaveTest, RateLimitSlaveShutdown)
+{
+  // Start a master.
+  master::Flags flags = CreateMasterFlags();
+  flags.slave_removal_rate_limit = "1/1secs";
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Drop all the PONGs to simulate health check timeout.
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Future<Nothing> acquire =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  // Now advance through the PINGs.
+  Clock::pause();
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // Master should acquire the permit for shutting down the slave.
+  AWAIT_READY(acquire);
+
+  // Master should shutdown the slave.
+  AWAIT_READY(shutdown);
+}
+
+
+// This test verifies that when a slave responds to pings after the
+// slave observer has scheduled it for shutdown (due to health check
+// failure), the shutdown is cancelled.
+TEST_F(SlaveTest, CancelSlaveShutdown)
+{
+  // Start a master.
+  master::Flags flags = CreateMasterFlags();
+  // Interval between slave removals.
+  Duration interval = master::SLAVE_PING_TIMEOUT * 10;
+  flags.slave_removal_rate_limit = "1/" + stringify(interval);
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Drop all the PONGs to simulate health check timeout.
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  // NOTE: We start two slaves in this test so that the rate limiter
+  // used by the slave observers gives out 2 permits. The first permit
+  // gets satisfied immediately. And since the 2nd permit will be
+  // enqueued, it's corresponding future can be discarded before it
+  // becomes ready.
+  // TODO(vinod): Inject a rate limiter into 'Master' instead to
+  // simplify the test.
+
+  // Start the first slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<PID<Slave> > slave1 = StartSlave();
+  ASSERT_SOME(slave1);
+
+  AWAIT_READY(slaveRegisteredMessage1);
+
+  // Start the second slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<PID<Slave> > slave2 = StartSlave();
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(slaveRegisteredMessage2);
+
+  Future<Nothing> acquire1 =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<Nothing> acquire2 =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  // Now advance through the PINGs until shutdown permits are given
+  // out for both the slaves.
+  Clock::pause();
+  while (true) {
+    AWAIT_READY(ping);
+    if (acquire1.isReady() && acquire2.isReady()) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+  Clock::settle();
+
+  // The slave that first timed out should be shutdown right away.
+  AWAIT_READY(shutdown);
+
+  // Ensure the 2nd slave's shutdown is canceled.
+  EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+
+  // Reset the filters to allow pongs from the 2nd slave.
+  filter(NULL);
+
+  // Advance clock enough to do a ping pong.
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // Now advance the clock to the time the 2nd permit is acquired.
+  Clock::advance(interval);
+
+  // Settle the clock to give a chance for the master to shutdown
+  // the 2nd slave (it shouldn't in this test).
+  Clock::settle();
+}
+
+
 // This test ensures that a killTask() can happen between runTask()
 // and _runTask() and then gets "handled properly". This means that
 // the task never gets started, but also does not get lost. The end
@@ -1220,7 +1360,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   StandaloneMasterDetector detector(master.get());
 
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
-  process::spawn(slave);
+  spawn(slave);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
@@ -1311,8 +1451,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   driver.stop();
   driver.join();
 
-  process::terminate(slave);
-  process::wait(slave);
+  terminate(slave);
+  wait(slave);
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
@@ -1497,8 +1637,8 @@ TEST_F(SlaveTest, TaskLabels)
   AWAIT_READY(update);
 
   // Verify label key and value in slave state.json.
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "state.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "state.json");
   AWAIT_READY(response);
 
   EXPECT_SOME_EQ(
@@ -1685,7 +1825,7 @@ TEST_F(SlaveTest, CommandExecutorGracefulShutdown)
   // Ensure that a reap will occur within the grace period.
   Duration timeout = slave::getExecutorGracePeriod(
       flags.executor_shutdown_grace_period);
-  EXPECT_GT(timeout, process::MAX_REAP_INTERVAL());
+  EXPECT_GT(timeout, MAX_REAP_INTERVAL());
 
   Fetcher fetcher;
   Try<MesosContainerizer*> containerizer = MesosContainerizer::create(


[3/3] mesos git commit: Added metrics for slave shutdowns.

Posted by vi...@apache.org.
Added metrics for slave shutdowns.

Review: https://reviews.apache.org/r/30584


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3048e5e1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3048e5e1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3048e5e1

Branch: refs/heads/master
Commit: 3048e5e1686a5ae0a0f04fd30fdda0a380e9d13d
Parents: 886efef
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Feb 3 14:34:28 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp  | 125 +++++++++++++++++++++++---------------------
 src/master/master.hpp  |   4 +-
 src/master/metrics.cpp |  12 ++++-
 src/master/metrics.hpp |   4 ++
 4 files changed, 84 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e42b922..234bbec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -118,13 +118,15 @@ public:
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
                 const PID<Master>& _master,
-                const Option<shared_ptr<RateLimiter>>& _limiter)
+                const Option<shared_ptr<RateLimiter>>& _limiter,
+                const shared_ptr<Metrics> _metrics)
     : ProcessBase(process::ID::generate("slave-observer")),
       slave(_slave),
       slaveInfo(_slaveInfo),
       slaveId(_slaveId),
       master(_master),
       limiter(_limiter),
+      metrics(_metrics),
       timeouts(0),
       pinged(false),
       connected(true)
@@ -214,6 +216,8 @@ protected:
       acquire = limiter.get()->acquire();
     }
 
+    ++metrics->slave_shutdowns_scheduled;
+
     shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
   }
 
@@ -236,6 +240,8 @@ protected:
     } else if (future.isDiscarded()) {
       LOG(INFO) << "Canceling shutdown of slave " << slaveId
                 << " since a pong is received!";
+
+      ++metrics->slave_shutdowns_canceled;
     }
 
     shuttingDown = None();
@@ -247,6 +253,7 @@ private:
   const SlaveID slaveId;
   const PID<Master> master;
   const Option<shared_ptr<RateLimiter>> limiter;
+  shared_ptr<Metrics> metrics;
   Option<Future<Nothing>> shuttingDown;
   uint32_t timeouts;
   bool pinged;
@@ -273,7 +280,7 @@ Master::Master(
     contender(_contender),
     detector(_detector),
     authorizer(_authorizer),
-    metrics(*this),
+    metrics(new Metrics(*this)),
     electedTime(None())
 {
   // NOTE: We populate 'info_' here instead of inside 'initialize()'
@@ -977,9 +984,9 @@ void Master::visit(const MessageEvent& event)
   // 'Master::Frameworks::principals' for details.
   if (principal.isSome()) {
     // If the framework has a principal, the counter must exist.
-    CHECK(metrics.frameworks.contains(principal.get()));
+    CHECK(metrics->frameworks.contains(principal.get()));
     Counter messages_received =
-      metrics.frameworks.get(principal.get()).get()->messages_received;
+      metrics->frameworks.get(principal.get()).get()->messages_received;
     ++messages_received;
   }
 
@@ -987,7 +994,7 @@ void Master::visit(const MessageEvent& event)
   if (!elected()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
             << "not elected yet";
-    ++metrics.dropped_messages;
+    ++metrics->dropped_messages;
     return;
   }
 
@@ -1001,7 +1008,7 @@ void Master::visit(const MessageEvent& event)
   if (!recovered.get().isReady()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
             << "not recovered yet";
-    ++metrics.dropped_messages;
+    ++metrics->dropped_messages;
     return;
   }
 
@@ -1121,9 +1128,9 @@ void Master::_visit(const MessageEvent& event)
   // Note that it could be removed in handling
   // 'UnregisterFrameworkMessage' if it's the last framework with
   // this principal.
-  if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+  if (principal.isSome() && metrics->frameworks.contains(principal.get())) {
     Counter messages_processed =
-      metrics.frameworks.get(principal.get()).get()->messages_processed;
+      metrics->frameworks.get(principal.get()).get()->messages_processed;
     ++messages_processed;
   }
 }
@@ -1249,7 +1256,7 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
                  << " (" << slave.info().hostname() << ") did not re-register "
                  << "within the timeout; removing it from the registrar";
 
-    ++metrics.recovery_slave_removals;
+    ++metrics->recovery_slave_removals;
 
     slaves.recovered.erase(slave.info().id());
 
@@ -1542,7 +1549,7 @@ void Master::registerFramework(
     const UPID& from,
     const FrameworkInfo& frameworkInfo)
 {
-  ++metrics.messages_register_framework;
+  ++metrics->messages_register_framework;
 
   if (authenticating.contains(from)) {
     // TODO(vinod): Consider dropping this request and fix the tests
@@ -1652,7 +1659,7 @@ void Master::reregisterFramework(
     const FrameworkInfo& frameworkInfo,
     bool failover)
 {
-  ++metrics.messages_reregister_framework;
+  ++metrics->messages_reregister_framework;
 
   if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
     LOG(ERROR) << "Framework '" << frameworkInfo.name() << "' at " << from
@@ -1881,7 +1888,7 @@ void Master::unregisterFramework(
     const UPID& from,
     const FrameworkID& frameworkId)
 {
-  ++metrics.messages_unregister_framework;
+  ++metrics->messages_unregister_framework;
 
   LOG(INFO) << "Asked to unregister framework " << frameworkId;
 
@@ -1902,7 +1909,7 @@ void Master::deactivateFramework(
     const UPID& from,
     const FrameworkID& frameworkId)
 {
-  ++metrics.messages_deactivate_framework;
+  ++metrics->messages_deactivate_framework;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2005,7 +2012,7 @@ void Master::resourceRequest(
     const FrameworkID& frameworkId,
     const vector<Request>& requests)
 {
-  ++metrics.messages_resource_request;
+  ++metrics->messages_resource_request;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2036,9 +2043,9 @@ void Master::launchTasks(
     const vector<OfferID>& offerIds)
 {
   if (!tasks.empty()) {
-    ++metrics.messages_launch_tasks;
+    ++metrics->messages_launch_tasks;
   } else {
-    ++metrics.messages_decline_offers;
+    ++metrics->messages_decline_offers;
   }
 
   Framework* framework = getFramework(frameworkId);
@@ -2235,7 +2242,7 @@ void Master::accept(
             "Task launched with invalid offers: " + error.get().message,
             TaskStatus::REASON_INVALID_OFFERS);
 
-        metrics.tasks_lost++;
+        metrics->tasks_lost++;
         stats.tasks[TASK_LOST]++;
 
         forward(update, UPID(), framework);
@@ -2339,7 +2346,7 @@ void Master::_accept(
                 TaskStatus::REASON_SLAVE_REMOVED :
                 TaskStatus::REASON_SLAVE_DISCONNECTED);
 
-        metrics.tasks_lost++;
+        metrics->tasks_lost++;
         stats.tasks[TASK_LOST]++;
 
         forward(update, UPID(), framework);
@@ -2472,7 +2479,7 @@ void Master::_accept(
                     "Not authorized to launch as user '" + user + "'",
                 TaskStatus::REASON_TASK_UNAUTHORIZED);
 
-            metrics.tasks_error++;
+            metrics->tasks_error++;
             stats.tasks[TASK_ERROR]++;
 
             forward(update, UPID(), framework);
@@ -2497,7 +2504,7 @@ void Master::_accept(
                 validationError.get().message,
                 TaskStatus::REASON_TASK_INVALID);
 
-            metrics.tasks_error++;
+            metrics->tasks_error++;
             stats.tasks[TASK_ERROR]++;
 
             forward(update, UPID(), framework);
@@ -2554,7 +2561,7 @@ void Master::_accept(
 
 void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
 {
-  ++metrics.messages_revive_offers;
+  ++metrics->messages_revive_offers;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2582,7 +2589,7 @@ void Master::killTask(
     const FrameworkID& frameworkId,
     const TaskID& taskId)
 {
-  ++metrics.messages_kill_task;
+  ++metrics->messages_kill_task;
 
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
@@ -2668,7 +2675,7 @@ void Master::statusUpdateAcknowledgement(
     const TaskID& taskId,
     const string& uuid)
 {
-  metrics.messages_status_update_acknowledgement++;
+  metrics->messages_status_update_acknowledgement++;
 
   // TODO(bmahler): Consider adding a message validator abstraction
   // for the master that takes care of all this boilerplate. Ideally
@@ -2684,7 +2691,7 @@ void Master::statusUpdateAcknowledgement(
       << "Ignoring status update acknowledgement message for task " << taskId
       << " of framework " << frameworkId << " on slave " << slaveId
       << " because the framework cannot be found";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2693,7 +2700,7 @@ void Master::statusUpdateAcknowledgement(
       << "Ignoring status update acknowledgement message for task " << taskId
       << " of framework " << *framework << " on slave " << slaveId
       << " because it is not expected from " << from;
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2704,7 +2711,7 @@ void Master::statusUpdateAcknowledgement(
       << "Cannot send status update acknowledgement message for task " << taskId
       << " of framework " << *framework << " to slave " << slaveId
       << " because slave is not registered";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2713,7 +2720,7 @@ void Master::statusUpdateAcknowledgement(
       << "Cannot send status update acknowledgement message for task " << taskId
       << " of framework " << *framework << " to slave " << *slave
       << " because slave is disconnected";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2737,7 +2744,7 @@ void Master::statusUpdateAcknowledgement(
         << "Ignoring status update acknowledgement message for task " << taskId
         << " of framework " << *framework << " to slave " << *slave
         << " because it no update was sent by this master";
-      metrics.invalid_status_update_acknowledgements++;
+      metrics->invalid_status_update_acknowledgements++;
       return;
     }
 
@@ -2760,7 +2767,7 @@ void Master::statusUpdateAcknowledgement(
 
   send(slave->pid, message);
 
-  metrics.valid_status_update_acknowledgements++;
+  metrics->valid_status_update_acknowledgements++;
 }
 
 
@@ -2771,7 +2778,7 @@ void Master::schedulerMessage(
     const ExecutorID& executorId,
     const string& data)
 {
-  ++metrics.messages_framework_to_executor;
+  ++metrics->messages_framework_to_executor;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2781,7 +2788,7 @@ void Master::schedulerMessage(
       << " of framework " << frameworkId
       << " because the framework cannot be found";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2791,7 +2798,7 @@ void Master::schedulerMessage(
       << " of framework " << *framework
       << " because it is not expected from " << from;
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2801,7 +2808,7 @@ void Master::schedulerMessage(
                  << *framework << " to slave " << slaveId
                  << " because slave is not registered";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2810,7 +2817,7 @@ void Master::schedulerMessage(
                  << *framework << " to slave " << *slave
                  << " because slave is disconnected";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2825,7 +2832,7 @@ void Master::schedulerMessage(
   send(slave->pid, message);
 
   stats.validFrameworkMessages++;
-  metrics.valid_framework_to_executor_messages++;
+  metrics->valid_framework_to_executor_messages++;
 }
 
 
@@ -2835,7 +2842,7 @@ void Master::registerSlave(
     const vector<Resource>& checkpointedResources,
     const string& version)
 {
-  ++metrics.messages_register_slave;
+  ++metrics->messages_register_slave;
 
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up registration request from " << from
@@ -2953,7 +2960,7 @@ void Master::_registerSlave(
         Clock::now(),
         checkpointedResources);
 
-    ++metrics.slave_registrations;
+    ++metrics->slave_registrations;
 
     addSlave(slave);
 
@@ -2976,7 +2983,7 @@ void Master::reregisterSlave(
     const vector<Archive::Framework>& completedFrameworks,
     const string& version)
 {
-  ++metrics.messages_reregister_slave;
+  ++metrics->messages_reregister_slave;
 
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up re-registration request from " << from
@@ -3149,7 +3156,7 @@ void Master::_reregisterSlave(
 
     slave->reregisteredTime = Clock::now();
 
-    ++metrics.slave_reregistrations;
+    ++metrics->slave_reregistrations;
 
     addSlave(slave, completedFrameworks);
 
@@ -3198,7 +3205,7 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
 
 void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 {
-  ++metrics.messages_unregister_slave;
+  ++metrics->messages_unregister_slave;
 
   LOG(INFO) << "Asked to unregister slave " << slaveId;
 
@@ -3219,7 +3226,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 // because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
-  ++metrics.messages_status_update;
+  ++metrics->messages_status_update;
 
   if (slaves.removed.get(update.slave_id()).isSome()) {
     // If the slave is removed, we have already informed
@@ -3235,7 +3242,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
     send(pid, message);
 
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3246,7 +3253,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
                  << " from unknown slave " << pid
                  << " with id " << update.slave_id();
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3257,7 +3264,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
                  << " from slave " << *slave
                  << " because the framework is unknown";
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3272,7 +3279,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
     LOG(WARNING) << "Could not lookup task for status update " << update
                  << " from slave " << *slave;
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3285,7 +3292,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   }
 
   stats.validStatusUpdates++;
-  metrics.valid_status_updates++;
+  metrics->valid_status_updates++;
 }
 
 
@@ -3319,7 +3326,7 @@ void Master::exitedExecutor(
     const ExecutorID& executorId,
     int32_t status)
 {
-  ++metrics.messages_exited_executor;
+  ++metrics->messages_exited_executor;
 
   if (slaves.removed.get(slaveId).isSome()) {
     // If the slave is removed, we have already informed
@@ -3394,7 +3401,7 @@ void Master::reconcileTasks(
     const FrameworkID& frameworkId,
     const std::vector<TaskStatus>& statuses)
 {
-  ++metrics.messages_reconcile_tasks;
+  ++metrics->messages_reconcile_tasks;
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -3735,7 +3742,7 @@ void Master::offer(const FrameworkID& frameworkId,
 // 'authenticate' message doesn't contain the 'FrameworkID'.
 void Master::authenticate(const UPID& from, const UPID& pid)
 {
-  ++metrics.messages_authenticate;
+  ++metrics->messages_authenticate;
 
   // An authentication request is sent by a client (slave/framework)
   // in the following cases:
@@ -4073,8 +4080,8 @@ void Master::addFramework(Framework* framework)
   if (principal.isSome()) {
     // Create new framework metrics if this framework is the first
     // one of this principal. Otherwise existing metrics are reused.
-    if (!metrics.frameworks.contains(principal.get())) {
-      metrics.frameworks.put(
+    if (!metrics->frameworks.contains(principal.get())) {
+      metrics->frameworks.put(
           principal.get(),
           Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
     }
@@ -4252,8 +4259,8 @@ void Master::removeFramework(Framework* framework)
     // Remove the metrics for the principal if this framework is the
     // last one with this principal.
     if (!frameworks.principals.containsValue(principal.get())) {
-      CHECK(metrics.frameworks.contains(principal.get()));
-      metrics.frameworks.erase(principal.get());
+      CHECK(metrics->frameworks.contains(principal.get()));
+      metrics->frameworks.erase(principal.get());
     }
   }
 
@@ -4321,7 +4328,7 @@ void Master::addSlave(
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
-      slave->pid, slave->info, slave->id, self(), slaves.limiter);
+      slave->pid, slave->info, slave->id, self(), slaves.limiter, metrics);
 
   spawn(slave->observer);
 
@@ -4496,7 +4503,7 @@ void Master::_removeSlave(
 
   LOG(INFO) << "Removed slave " << slaveInfo.id() << " ("
             << slaveInfo.hostname() << ")";
-  ++metrics.slave_removals;
+  ++metrics->slave_removals;
 
   // Forward the LOST updates on to the framework.
   foreach (const StatusUpdate& update, updates) {
@@ -4611,10 +4618,10 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
     }
 
     switch (task->state()) {
-      case TASK_FINISHED: ++metrics.tasks_finished; break;
-      case TASK_FAILED:   ++metrics.tasks_failed;   break;
-      case TASK_KILLED:   ++metrics.tasks_killed;   break;
-      case TASK_LOST:     ++metrics.tasks_lost;     break;
+      case TASK_FINISHED: ++metrics->tasks_finished; break;
+      case TASK_FAILED:   ++metrics->tasks_failed;   break;
+      case TASK_KILLED:   ++metrics->tasks_killed;   break;
+      case TASK_LOST:     ++metrics->tasks_lost;     break;
       default: break;
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5a5c86f..dcfd38a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -666,7 +666,9 @@ private:
     uint64_t invalidFrameworkMessages;
   } stats;
 
-  Metrics metrics;
+  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+  // thread safe.
+  memory::shared_ptr<Metrics> metrics;
 
   // Gauge handlers.
   double _uptime_secs()

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 956fe50..a5cde16 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -148,7 +148,11 @@ Metrics::Metrics(const Master& master)
     slave_reregistrations(
         "master/slave_reregistrations"),
     slave_removals(
-        "master/slave_removals")
+        "master/slave_removals"),
+    slave_shutdowns_scheduled(
+         "master/slave_shutdowns_scheduled"),
+    slave_shutdowns_canceled(
+         "master/slave_shutdowns_canceled")
 {
   // TODO(dhamon): Check return values of 'add'.
   process::metrics::add(uptime_secs);
@@ -220,6 +224,9 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(slave_reregistrations);
   process::metrics::add(slave_removals);
 
+  process::metrics::add(slave_shutdowns_scheduled);
+  process::metrics::add(slave_shutdowns_canceled);
+
   // Create resource gauges.
   // TODO(dhamon): Set these up dynamically when adding a slave based on the
   // resources the slave exposes.
@@ -319,6 +326,9 @@ Metrics::~Metrics()
   process::metrics::remove(slave_reregistrations);
   process::metrics::remove(slave_removals);
 
+  process::metrics::remove(slave_shutdowns_scheduled);
+  process::metrics::remove(slave_shutdowns_canceled);
+
   foreach (const process::metrics::Gauge& gauge, resources_total) {
     process::metrics::remove(gauge);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 6a43abc..5e18f88 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -156,6 +156,10 @@ struct Metrics
   process::metrics::Counter slave_reregistrations;
   process::metrics::Counter slave_removals;
 
+  // Slave observer metrics.
+  process::metrics::Counter slave_shutdowns_scheduled;
+  process::metrics::Counter slave_shutdowns_canceled;
+
   // Resource metrics.
   std::vector<process::metrics::Gauge> resources_total;
   std::vector<process::metrics::Gauge> resources_used;