You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/02/05 01:28:06 UTC
[1/3] mesos git commit: Moved framework related rate limiters into
Master::Frameworks.
Repository: mesos
Updated Branches:
refs/heads/master b05e2f0bb -> 3048e5e16
Moved framework related rate limiters into Master::Frameworks.
Review: https://reviews.apache.org/r/30511
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fafccbd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fafccbd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fafccbd9
Branch: refs/heads/master
Commit: fafccbd9dcb92d4db27bc272d7443ac6ebecbae8
Parents: b05e2f0
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 12:44:53 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:48 2015 -0800
----------------------------------------------------------------------
src/master/master.cpp | 71 ++++++++++++++++++++++++++++++----------------
src/master/master.hpp | 35 +++++++----------------
2 files changed, 57 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 46caa55..d5c4beb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -257,6 +257,25 @@ Master::Master(
Master::~Master() {}
+// TODO(vinod): Update this interface to return failed futures when
+// capacity is reached.
+struct BoundedRateLimiter
+{
+ BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+ : limiter(new process::RateLimiter(qps)),
+ capacity(_capacity),
+ messages(0) {}
+
+ process::Owned<process::RateLimiter> limiter;
+ const Option<uint64_t> capacity;
+
+ // Number of outstanding messages for this RateLimiter.
+ // NOTE: ExitedEvents are throttled but not counted towards
+ // the capacity here.
+ uint64_t messages;
+};
+
+
void Master::initialize()
{
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
@@ -360,7 +379,7 @@ void Master::initialize()
if (flags.rate_limits.isSome()) {
// Add framework rate limiters.
foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
- if (limiters.contains(limit_.principal())) {
+ if (frameworks.limiters.contains(limit_.principal())) {
EXIT(1) << "Duplicate principal " << limit_.principal()
<< " found in RateLimits configuration";
}
@@ -375,12 +394,12 @@ void Master::initialize()
if (limit_.has_capacity()) {
capacity = limit_.capacity();
}
- limiters.put(
+ frameworks.limiters.put(
limit_.principal(),
Owned<BoundedRateLimiter>(
new BoundedRateLimiter(limit_.qps(), capacity)));
} else {
- limiters.put(limit_.principal(), None());
+ frameworks.limiters.put(limit_.principal(), None());
}
}
@@ -396,7 +415,7 @@ void Master::initialize()
if (flags.rate_limits.get().has_aggregate_default_capacity()) {
capacity = flags.rate_limits.get().aggregate_default_capacity();
}
- defaultLimiter = Owned<BoundedRateLimiter>(
+ frameworks.defaultLimiter = Owned<BoundedRateLimiter>(
new BoundedRateLimiter(
flags.rate_limits.get().aggregate_default_qps(), capacity));
}
@@ -905,9 +924,10 @@ void Master::visit(const MessageEvent& event)
// above. (or)
// 2) the principal exists in RateLimits but 'qps' is not set.
if (principal.isSome() &&
- limiters.contains(principal.get()) &&
- limiters[principal.get()].isSome()) {
- const Owned<BoundedRateLimiter>& limiter = limiters[principal.get()].get();
+ frameworks.limiters.contains(principal.get()) &&
+ frameworks.limiters[principal.get()].isSome()) {
+ const Owned<BoundedRateLimiter>& limiter =
+ frameworks.limiters[principal.get()].get();
if (limiter->capacity.isNone() ||
limiter->messages < limiter->capacity.get()) {
@@ -920,19 +940,21 @@ void Master::visit(const MessageEvent& event)
principal,
limiter->capacity.get());
}
- } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+ } else if ((principal.isNone() ||
+ !frameworks.limiters.contains(principal.get())) &&
isRegisteredFramework &&
- defaultLimiter.isSome()) {
- if (defaultLimiter.get()->capacity.isNone() ||
- defaultLimiter.get()->messages < defaultLimiter.get()->capacity.get()) {
- defaultLimiter.get()->messages++;
- defaultLimiter.get()->limiter->acquire()
+ frameworks.defaultLimiter.isSome()) {
+ if (frameworks.defaultLimiter.get()->capacity.isNone() ||
+ frameworks.defaultLimiter.get()->messages <
+ frameworks.defaultLimiter.get()->capacity.get()) {
+ frameworks.defaultLimiter.get()->messages++;
+ frameworks.defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), &Self::throttled, event, None()));
} else {
exceededCapacity(
event,
principal,
- defaultLimiter.get()->capacity.get());
+ frameworks.defaultLimiter.get()->capacity.get());
}
} else {
_visit(event);
@@ -957,14 +979,15 @@ void Master::visit(const ExitedEvent& event)
typedef void(Self::*F)(const ExitedEvent&);
if (principal.isSome() &&
- limiters.contains(principal.get()) &&
- limiters[principal.get()].isSome()) {
- limiters[principal.get()].get()->limiter->acquire()
+ frameworks.limiters.contains(principal.get()) &&
+ frameworks.limiters[principal.get()].isSome()) {
+ frameworks.limiters[principal.get()].get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
- } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+ } else if ((principal.isNone() ||
+ !frameworks.limiters.contains(principal.get())) &&
isRegisteredFramework &&
- defaultLimiter.isSome()) {
- defaultLimiter.get()->limiter->acquire()
+ frameworks.defaultLimiter.isSome()) {
+ frameworks.defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
@@ -979,11 +1002,11 @@ void Master::throttled(
// We already know a RateLimiter is used to throttle this event so
// here we only need to determine which.
if (principal.isSome()) {
- CHECK_SOME(limiters[principal.get()]);
- limiters[principal.get()].get()->messages--;
+ CHECK_SOME(frameworks.limiters[principal.get()]);
+ frameworks.limiters[principal.get()].get()->messages--;
} else {
- CHECK_SOME(defaultLimiter);
- defaultLimiter.get()->messages--;
+ CHECK_SOME(frameworks.defaultLimiter);
+ frameworks.defaultLimiter.get()->messages--;
}
_visit(event);
http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9d8c508..d98e1b6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,6 +86,7 @@ class Allocator;
class Repairer;
class SlaveObserver;
+struct BoundedRateLimiter;
struct Framework;
struct Role;
struct Slave;
@@ -615,6 +616,15 @@ private:
// allows them) if they have principals specified in
// FrameworkInfo.
hashmap<process::UPID, Option<std::string>> principals;
+
+ // BoundedRateLimiters keyed by the framework principal.
+ // Like Metrics::Frameworks, all frameworks of the same principal
+ // are throttled together at a common rate limit.
+ hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+
+ // The default limiter is for frameworks not specified in
+ // 'flags.rate_limits'.
+ Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
} frameworks;
hashmap<OfferID, Offer*> offers;
@@ -712,31 +722,6 @@ private:
process::Future<Option<Error>> validate(
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
-
- struct BoundedRateLimiter
- {
- BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
- : limiter(new process::RateLimiter(qps)),
- capacity(_capacity),
- messages(0) {}
-
- process::Owned<process::RateLimiter> limiter;
- const Option<uint64_t> capacity;
-
- // Number of outstanding messages for this RateLimiter.
- // NOTE: ExitedEvents are throttled but not counted towards
- // the capacity here.
- uint64_t messages;
- };
-
- // BoundedRateLimiters keyed by the framework principal.
- // Like Metrics::Frameworks, all frameworks of the same principal
- // are throttled together at a common rate limit.
- hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
-
- // The default limiter is for frameworks not specified in
- // 'flags.rate_limits'.
- Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
};
[2/3] mesos git commit: Rate limited the removal of slaves failing
health checks.
Posted by vi...@apache.org.
Rate limited the removal of slaves failing health checks.
Review: https://reviews.apache.org/r/30514
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/886efefc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/886efefc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/886efefc
Branch: refs/heads/master
Commit: 886efefc5f294b3ea22c1fa2ce70a9e9324eca19
Parents: fafccbd
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 11:51:02 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800
----------------------------------------------------------------------
src/master/flags.hpp | 10 ++
src/master/master.cpp | 104 +++++++++++++++++--
src/master/master.hpp | 6 ++
src/tests/partition_tests.cpp | 2 +-
src/tests/slave_tests.cpp | 204 +++++++++++++++++++++++++++++++------
5 files changed, 287 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 6c18a1a..e9f6fff 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -147,6 +147,15 @@ public:
"Values: [0%-100%]",
stringify(RECOVERY_SLAVE_REMOVAL_PERCENT_LIMIT * 100.0) + "%");
+ // TODO(vinod): Add a 'Rate' abstraction in stout and the
+ // corresponding parser for flags.
+ add(&Flags::slave_removal_rate_limit,
+ "slave_removal_rate_limit",
+ "The maximum rate (e.g., 1/10mins, 2/3hrs, etc) at which slaves will\n"
+ "be removed from the master when they fail health checks. By default\n"
+ "slaves will be removed as soon as they fail the health checks.\n"
+ "The value is of the form <Number of slaves>/<Duration>.");
+
add(&Flags::webui_dir,
"webui_dir",
"Directory path of the webui files/assets",
@@ -377,6 +386,7 @@ public:
bool log_auto_initialize;
Duration slave_reregister_timeout;
std::string recovery_slave_removal_limit;
+ Option<std::string> slave_removal_rate_limit;
std::string webui_dir;
std::string whitelist;
std::string user_sorter;
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d5c4beb..e42b922 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -94,6 +94,7 @@ using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
+using process::RateLimiter;
using process::Shared;
using process::Time;
using process::Timer;
@@ -116,12 +117,14 @@ public:
SlaveObserver(const UPID& _slave,
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
- const PID<Master>& _master)
+ const PID<Master>& _master,
+ const Option<shared_ptr<RateLimiter>>& _limiter)
: ProcessBase(process::ID::generate("slave-observer")),
slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
master(_master),
+ limiter(_limiter),
timeouts(0),
pinged(false),
connected(true)
@@ -167,23 +170,75 @@ protected:
{
timeouts = 0;
pinged = false;
+
+ // Cancel any pending shutdown.
+ if (shuttingDown.isSome()) {
+ // Need a copy for non-const access.
+ Future<Nothing> future = shuttingDown.get();
+ future.discard();
+ }
}
void timeout()
{
- if (pinged) { // So we haven't got back a pong yet ...
- if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+ if (pinged) {
+ timeouts++; // No pong has been received before the timeout.
+ if (timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+ // No pong has been received for the last
+ // 'MAX_SLAVE_PING_TIMEOUTS' pings.
shutdown();
- return;
}
}
+ // NOTE: We keep pinging even if we schedule a shutdown. This is
+ // because if the slave eventually responds to a ping, we can
+ // cancel the shutdown.
ping();
}
+ // NOTE: The shutdown of the slave is rate limited and can be
+ // canceled if a pong was received before the actual shutdown is
+ // called.
void shutdown()
{
- dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out");
+ if (shuttingDown.isSome()) {
+ return; // Shutdown is already in progress.
+ }
+
+ Future<Nothing> acquire = Nothing();
+
+ if (limiter.isSome()) {
+ LOG(INFO) << "Scheduling shutdown of slave " << slaveId
+ << " due to health check timeout";
+
+ acquire = limiter.get()->acquire();
+ }
+
+ shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+ }
+
+ void _shutdown()
+ {
+ CHECK_SOME(shuttingDown);
+
+ const Future<Nothing>& future = shuttingDown.get();
+
+ CHECK(!future.isFailed());
+
+ if (future.isReady()) {
+ LOG(INFO) << "Shutting down slave " << slaveId
+ << " due to health check timeout";
+
+ dispatch(master,
+ &Master::shutdownSlave,
+ slaveId,
+ "health check timed out");
+ } else if (future.isDiscarded()) {
+ LOG(INFO) << "Canceling shutdown of slave " << slaveId
+ << " since a pong is received!";
+ }
+
+ shuttingDown = None();
}
private:
@@ -191,6 +246,8 @@ private:
const SlaveInfo slaveInfo;
const SlaveID slaveId;
const PID<Master> master;
+ const Option<shared_ptr<RateLimiter>> limiter;
+ Option<Future<Nothing>> shuttingDown;
uint32_t timeouts;
bool pinged;
bool connected;
@@ -423,6 +480,41 @@ void Master::initialize()
LOG(INFO) << "Framework rate limiting enabled";
}
+ if (flags.slave_removal_rate_limit.isSome()) {
+ LOG(INFO) << "Slave removal is rate limited to "
+ << flags.slave_removal_rate_limit.get();
+
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaves.limiter = new RateLimiter(permits.get(), duration.get());
+ }
+
hashmap<string, RoleInfo> roleInfos;
// Add the default role.
@@ -4229,7 +4321,7 @@ void Master::addSlave(
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
- slave->pid, slave->info, slave->id, self());
+ slave->pid, slave->info, slave->id, self(), slaves.limiter);
spawn(slave->observer);
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d98e1b6..5a5c86f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -585,6 +585,12 @@ private:
// TODO(bmahler): Ideally we could use a cache with set semantics.
Cache<SlaveID, Nothing> removed;
+ // This rate limiter is used to limit the removal of slaves failing
+ // health checks.
+ // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+ // a wrapper around libprocess process which is thread safe.
+ Option<memory::shared_ptr<process::RateLimiter>> limiter;
+
bool transitioning(const Option<SlaveID>& slaveId)
{
if (slaveId.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index fea7801..b3af282 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -64,7 +64,7 @@ class PartitionTest : public MesosTest {};
// This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
+// message for a partitioned slave.
TEST_F(PartitionTest, PartitionedSlave)
{
Try<PID<Master> > master = StartMaster();
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e7e2af6..956ce64 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -64,6 +64,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using namespace process;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
@@ -73,12 +75,6 @@ using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
using mesos::internal::slave::Slave;
-using process::Clock;
-using process::Future;
-using process::Message;
-using process::Owned;
-using process::PID;
-
using std::map;
using std::string;
using std::vector;
@@ -151,7 +147,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
- Future<process::Message> registerExecutor =
+ Future<Message> registerExecutor =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
@@ -234,7 +230,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
- Future<process::Message> registerExecutorMessage =
+ Future<Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
@@ -322,7 +318,7 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
// Expect wait after launch is called but don't return anything
// until after we've finished everything below.
Future<Nothing> wait;
- process::Promise<containerizer::Termination> promise;
+ Promise<containerizer::Termination> promise;
EXPECT_CALL(containerizer, wait(_))
.WillOnce(DoAll(FutureSatisfy(&wait),
Return(promise.future())));
@@ -359,12 +355,12 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
- Try<process::Subprocess> executor =
- process::subprocess(
+ Try<Subprocess> executor =
+ subprocess(
executorCommand,
- process::Subprocess::PIPE(),
- process::Subprocess::PIPE(),
- process::Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
environment);
ASSERT_SOME(executor);
@@ -728,13 +724,13 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// We need to grab this message to get the scheduler's pid.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
schedDriver.start();
AWAIT_READY(frameworkRegisteredMessage);
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+ const UPID schedulerPid = frameworkRegisteredMessage.get().to;
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
@@ -776,10 +772,9 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
AWAIT_READY(acknowledgementMessage);
// Send the acknowledgement to the slave with a non-leading master.
- process::post(
- process::UPID("master@localhost:1"),
- slave.get(),
- acknowledgementMessage.get());
+ post(process::UPID("master@localhost:1"),
+ slave.get(),
+ acknowledgementMessage.get());
// Make sure the acknowledgement was ignored.
Clock::settle();
@@ -821,8 +816,8 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint)
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
- Future<process::http::Response> response =
- process::http::get(slave.get(), "stats.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "stats.json");
AWAIT_READY(response);
@@ -891,8 +886,8 @@ TEST_F(SlaveTest, StateEndpoint)
Try<PID<Slave> > slave = StartSlave(flags);
ASSERT_SOME(slave);
- Future<process::http::Response> response =
- process::http::get(slave.get(), "state.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "state.json");
AWAIT_READY(response);
@@ -1009,7 +1004,7 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
// Send a ShutdownMessage instead of calling Stop() directly
// to avoid blocking.
- process::post(master.get(), slave.get(), ShutdownMessage());
+ post(master.get(), slave.get(), ShutdownMessage());
// Advance the clock to trigger doReliableRegistration().
Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2);
@@ -1096,7 +1091,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
// Set up the containerizer so the next update() will fail.
EXPECT_CALL(containerizer, update(_, _))
- .WillOnce(Return(process::Failure("update() failed")))
+ .WillOnce(Return(Failure("update() failed")))
.WillRepeatedly(Return(Nothing()));
EXPECT_CALL(exec, killTask(_, _))
@@ -1201,6 +1196,151 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
}
+// This test ensures that when slave removal rate limit is specified
+// a slave that fails health checks is removed after acquiring permit
+// from the rate limiter.
+TEST_F(SlaveTest, RateLimitSlaveShutdown)
+{
+ // Start a master.
+ master::Flags flags = CreateMasterFlags();
+ flags.slave_removal_rate_limit = "1/1secs";
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Set these expectations up before we spawn the slave so that we
+ // don't miss the first PING.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+ // Drop all the PONGs to simulate health check timeout.
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Start a slave.
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ Future<Nothing> acquire =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+ // Now advance through the PINGs.
+ Clock::pause();
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // Master should acquire the permit for shutting down the slave.
+ AWAIT_READY(acquire);
+
+ // Master should shutdown the slave.
+ AWAIT_READY(shutdown);
+}
+
+
+// This test verifies that when a slave responds to pings after the
+// slave observer has scheduled it for shutdown (due to health check
+// failure), the shutdown is cancelled.
+TEST_F(SlaveTest, CancelSlaveShutdown)
+{
+ // Start a master.
+ master::Flags flags = CreateMasterFlags();
+ // Interval between slave removals.
+ Duration interval = master::SLAVE_PING_TIMEOUT * 10;
+ flags.slave_removal_rate_limit = "1/" + stringify(interval);
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Set these expectations up before we spawn the slave so that we
+ // don't miss the first PING.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+ // Drop all the PONGs to simulate health check timeout.
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ // NOTE: We start two slaves in this test so that the rate limiter
+ // used by the slave observers gives out 2 permits. The first permit
+ // gets satisfied immediately. And since the 2nd permit will be
+ // enqueued, it's corresponding future can be discarded before it
+ // becomes ready.
+ // TODO(vinod): Inject a rate limiter into 'Master' instead to
+ // simplify the test.
+
+ // Start the first slave.
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<PID<Slave> > slave1 = StartSlave();
+ ASSERT_SOME(slave1);
+
+ AWAIT_READY(slaveRegisteredMessage1);
+
+ // Start the second slave.
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<PID<Slave> > slave2 = StartSlave();
+ ASSERT_SOME(slave2);
+
+ AWAIT_READY(slaveRegisteredMessage2);
+
+ Future<Nothing> acquire1 =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<Nothing> acquire2 =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+ // Now advance through the PINGs until shutdown permits are given
+ // out for both the slaves.
+ Clock::pause();
+ while (true) {
+ AWAIT_READY(ping);
+ if (acquire1.isReady() && acquire2.isReady()) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+ Clock::settle();
+
+ // The slave that first timed out should be shutdown right away.
+ AWAIT_READY(shutdown);
+
+ // Ensure the 2nd slave's shutdown is canceled.
+ EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+
+ // Reset the filters to allow pongs from the 2nd slave.
+ filter(NULL);
+
+ // Advance clock enough to do a ping pong.
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // Now advance the clock to the time the 2nd permit is acquired.
+ Clock::advance(interval);
+
+ // Settle the clock to give a chance for the master to shutdown
+ // the 2nd slave (it shouldn't in this test).
+ Clock::settle();
+}
+
+
// This test ensures that a killTask() can happen between runTask()
// and _runTask() and then gets "handled properly". This means that
// the task never gets started, but also does not get lost. The end
@@ -1220,7 +1360,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
StandaloneMasterDetector detector(master.get());
MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
- process::spawn(slave);
+ spawn(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
@@ -1311,8 +1451,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
driver.stop();
driver.join();
- process::terminate(slave);
- process::wait(slave);
+ terminate(slave);
+ wait(slave);
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
@@ -1497,8 +1637,8 @@ TEST_F(SlaveTest, TaskLabels)
AWAIT_READY(update);
// Verify label key and value in slave state.json.
- Future<process::http::Response> response =
- process::http::get(slave.get(), "state.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "state.json");
AWAIT_READY(response);
EXPECT_SOME_EQ(
@@ -1685,7 +1825,7 @@ TEST_F(SlaveTest, CommandExecutorGracefulShutdown)
// Ensure that a reap will occur within the grace period.
Duration timeout = slave::getExecutorGracePeriod(
flags.executor_shutdown_grace_period);
- EXPECT_GT(timeout, process::MAX_REAP_INTERVAL());
+ EXPECT_GT(timeout, MAX_REAP_INTERVAL());
Fetcher fetcher;
Try<MesosContainerizer*> containerizer = MesosContainerizer::create(
[3/3] mesos git commit: Added metrics for slave shutdowns.
Posted by vi...@apache.org.
Added metrics for slave shutdowns.
Review: https://reviews.apache.org/r/30584
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3048e5e1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3048e5e1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3048e5e1
Branch: refs/heads/master
Commit: 3048e5e1686a5ae0a0f04fd30fdda0a380e9d13d
Parents: 886efef
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Feb 3 14:34:28 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800
----------------------------------------------------------------------
src/master/master.cpp | 125 +++++++++++++++++++++++---------------------
src/master/master.hpp | 4 +-
src/master/metrics.cpp | 12 ++++-
src/master/metrics.hpp | 4 ++
4 files changed, 84 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e42b922..234bbec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -118,13 +118,15 @@ public:
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
const PID<Master>& _master,
- const Option<shared_ptr<RateLimiter>>& _limiter)
+ const Option<shared_ptr<RateLimiter>>& _limiter,
+ const shared_ptr<Metrics> _metrics)
: ProcessBase(process::ID::generate("slave-observer")),
slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
master(_master),
limiter(_limiter),
+ metrics(_metrics),
timeouts(0),
pinged(false),
connected(true)
@@ -214,6 +216,8 @@ protected:
acquire = limiter.get()->acquire();
}
+ ++metrics->slave_shutdowns_scheduled;
+
shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
}
@@ -236,6 +240,8 @@ protected:
} else if (future.isDiscarded()) {
LOG(INFO) << "Canceling shutdown of slave " << slaveId
<< " since a pong is received!";
+
+ ++metrics->slave_shutdowns_canceled;
}
shuttingDown = None();
@@ -247,6 +253,7 @@ private:
const SlaveID slaveId;
const PID<Master> master;
const Option<shared_ptr<RateLimiter>> limiter;
+ shared_ptr<Metrics> metrics;
Option<Future<Nothing>> shuttingDown;
uint32_t timeouts;
bool pinged;
@@ -273,7 +280,7 @@ Master::Master(
contender(_contender),
detector(_detector),
authorizer(_authorizer),
- metrics(*this),
+ metrics(new Metrics(*this)),
electedTime(None())
{
// NOTE: We populate 'info_' here instead of inside 'initialize()'
@@ -977,9 +984,9 @@ void Master::visit(const MessageEvent& event)
// 'Master::Frameworks::principals' for details.
if (principal.isSome()) {
// If the framework has a principal, the counter must exist.
- CHECK(metrics.frameworks.contains(principal.get()));
+ CHECK(metrics->frameworks.contains(principal.get()));
Counter messages_received =
- metrics.frameworks.get(principal.get()).get()->messages_received;
+ metrics->frameworks.get(principal.get()).get()->messages_received;
++messages_received;
}
@@ -987,7 +994,7 @@ void Master::visit(const MessageEvent& event)
if (!elected()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
<< "not elected yet";
- ++metrics.dropped_messages;
+ ++metrics->dropped_messages;
return;
}
@@ -1001,7 +1008,7 @@ void Master::visit(const MessageEvent& event)
if (!recovered.get().isReady()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
<< "not recovered yet";
- ++metrics.dropped_messages;
+ ++metrics->dropped_messages;
return;
}
@@ -1121,9 +1128,9 @@ void Master::_visit(const MessageEvent& event)
// Note that it could be removed in handling
// 'UnregisterFrameworkMessage' if it's the last framework with
// this principal.
- if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+ if (principal.isSome() && metrics->frameworks.contains(principal.get())) {
Counter messages_processed =
- metrics.frameworks.get(principal.get()).get()->messages_processed;
+ metrics->frameworks.get(principal.get()).get()->messages_processed;
++messages_processed;
}
}
@@ -1249,7 +1256,7 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
<< " (" << slave.info().hostname() << ") did not re-register "
<< "within the timeout; removing it from the registrar";
- ++metrics.recovery_slave_removals;
+ ++metrics->recovery_slave_removals;
slaves.recovered.erase(slave.info().id());
@@ -1542,7 +1549,7 @@ void Master::registerFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo)
{
- ++metrics.messages_register_framework;
+ ++metrics->messages_register_framework;
if (authenticating.contains(from)) {
// TODO(vinod): Consider dropping this request and fix the tests
@@ -1652,7 +1659,7 @@ void Master::reregisterFramework(
const FrameworkInfo& frameworkInfo,
bool failover)
{
- ++metrics.messages_reregister_framework;
+ ++metrics->messages_reregister_framework;
if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
LOG(ERROR) << "Framework '" << frameworkInfo.name() << "' at " << from
@@ -1881,7 +1888,7 @@ void Master::unregisterFramework(
const UPID& from,
const FrameworkID& frameworkId)
{
- ++metrics.messages_unregister_framework;
+ ++metrics->messages_unregister_framework;
LOG(INFO) << "Asked to unregister framework " << frameworkId;
@@ -1902,7 +1909,7 @@ void Master::deactivateFramework(
const UPID& from,
const FrameworkID& frameworkId)
{
- ++metrics.messages_deactivate_framework;
+ ++metrics->messages_deactivate_framework;
Framework* framework = getFramework(frameworkId);
@@ -2005,7 +2012,7 @@ void Master::resourceRequest(
const FrameworkID& frameworkId,
const vector<Request>& requests)
{
- ++metrics.messages_resource_request;
+ ++metrics->messages_resource_request;
Framework* framework = getFramework(frameworkId);
@@ -2036,9 +2043,9 @@ void Master::launchTasks(
const vector<OfferID>& offerIds)
{
if (!tasks.empty()) {
- ++metrics.messages_launch_tasks;
+ ++metrics->messages_launch_tasks;
} else {
- ++metrics.messages_decline_offers;
+ ++metrics->messages_decline_offers;
}
Framework* framework = getFramework(frameworkId);
@@ -2235,7 +2242,7 @@ void Master::accept(
"Task launched with invalid offers: " + error.get().message,
TaskStatus::REASON_INVALID_OFFERS);
- metrics.tasks_lost++;
+ metrics->tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
@@ -2339,7 +2346,7 @@ void Master::_accept(
TaskStatus::REASON_SLAVE_REMOVED :
TaskStatus::REASON_SLAVE_DISCONNECTED);
- metrics.tasks_lost++;
+ metrics->tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
@@ -2472,7 +2479,7 @@ void Master::_accept(
"Not authorized to launch as user '" + user + "'",
TaskStatus::REASON_TASK_UNAUTHORIZED);
- metrics.tasks_error++;
+ metrics->tasks_error++;
stats.tasks[TASK_ERROR]++;
forward(update, UPID(), framework);
@@ -2497,7 +2504,7 @@ void Master::_accept(
validationError.get().message,
TaskStatus::REASON_TASK_INVALID);
- metrics.tasks_error++;
+ metrics->tasks_error++;
stats.tasks[TASK_ERROR]++;
forward(update, UPID(), framework);
@@ -2554,7 +2561,7 @@ void Master::_accept(
void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
{
- ++metrics.messages_revive_offers;
+ ++metrics->messages_revive_offers;
Framework* framework = getFramework(frameworkId);
@@ -2582,7 +2589,7 @@ void Master::killTask(
const FrameworkID& frameworkId,
const TaskID& taskId)
{
- ++metrics.messages_kill_task;
+ ++metrics->messages_kill_task;
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
@@ -2668,7 +2675,7 @@ void Master::statusUpdateAcknowledgement(
const TaskID& taskId,
const string& uuid)
{
- metrics.messages_status_update_acknowledgement++;
+ metrics->messages_status_update_acknowledgement++;
// TODO(bmahler): Consider adding a message validator abstraction
// for the master that takes care of all this boilerplate. Ideally
@@ -2684,7 +2691,7 @@ void Master::statusUpdateAcknowledgement(
<< "Ignoring status update acknowledgement message for task " << taskId
<< " of framework " << frameworkId << " on slave " << slaveId
<< " because the framework cannot be found";
- metrics.invalid_status_update_acknowledgements++;
+ metrics->invalid_status_update_acknowledgements++;
return;
}
@@ -2693,7 +2700,7 @@ void Master::statusUpdateAcknowledgement(
<< "Ignoring status update acknowledgement message for task " << taskId
<< " of framework " << *framework << " on slave " << slaveId
<< " because it is not expected from " << from;
- metrics.invalid_status_update_acknowledgements++;
+ metrics->invalid_status_update_acknowledgements++;
return;
}
@@ -2704,7 +2711,7 @@ void Master::statusUpdateAcknowledgement(
<< "Cannot send status update acknowledgement message for task " << taskId
<< " of framework " << *framework << " to slave " << slaveId
<< " because slave is not registered";
- metrics.invalid_status_update_acknowledgements++;
+ metrics->invalid_status_update_acknowledgements++;
return;
}
@@ -2713,7 +2720,7 @@ void Master::statusUpdateAcknowledgement(
<< "Cannot send status update acknowledgement message for task " << taskId
<< " of framework " << *framework << " to slave " << *slave
<< " because slave is disconnected";
- metrics.invalid_status_update_acknowledgements++;
+ metrics->invalid_status_update_acknowledgements++;
return;
}
@@ -2737,7 +2744,7 @@ void Master::statusUpdateAcknowledgement(
<< "Ignoring status update acknowledgement message for task " << taskId
<< " of framework " << *framework << " to slave " << *slave
<< " because it no update was sent by this master";
- metrics.invalid_status_update_acknowledgements++;
+ metrics->invalid_status_update_acknowledgements++;
return;
}
@@ -2760,7 +2767,7 @@ void Master::statusUpdateAcknowledgement(
send(slave->pid, message);
- metrics.valid_status_update_acknowledgements++;
+ metrics->valid_status_update_acknowledgements++;
}
@@ -2771,7 +2778,7 @@ void Master::schedulerMessage(
const ExecutorID& executorId,
const string& data)
{
- ++metrics.messages_framework_to_executor;
+ ++metrics->messages_framework_to_executor;
Framework* framework = getFramework(frameworkId);
@@ -2781,7 +2788,7 @@ void Master::schedulerMessage(
<< " of framework " << frameworkId
<< " because the framework cannot be found";
stats.invalidFrameworkMessages++;
- metrics.invalid_framework_to_executor_messages++;
+ metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -2791,7 +2798,7 @@ void Master::schedulerMessage(
<< " of framework " << *framework
<< " because it is not expected from " << from;
stats.invalidFrameworkMessages++;
- metrics.invalid_framework_to_executor_messages++;
+ metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -2801,7 +2808,7 @@ void Master::schedulerMessage(
<< *framework << " to slave " << slaveId
<< " because slave is not registered";
stats.invalidFrameworkMessages++;
- metrics.invalid_framework_to_executor_messages++;
+ metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -2810,7 +2817,7 @@ void Master::schedulerMessage(
<< *framework << " to slave " << *slave
<< " because slave is disconnected";
stats.invalidFrameworkMessages++;
- metrics.invalid_framework_to_executor_messages++;
+ metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -2825,7 +2832,7 @@ void Master::schedulerMessage(
send(slave->pid, message);
stats.validFrameworkMessages++;
- metrics.valid_framework_to_executor_messages++;
+ metrics->valid_framework_to_executor_messages++;
}
@@ -2835,7 +2842,7 @@ void Master::registerSlave(
const vector<Resource>& checkpointedResources,
const string& version)
{
- ++metrics.messages_register_slave;
+ ++metrics->messages_register_slave;
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up registration request from " << from
@@ -2953,7 +2960,7 @@ void Master::_registerSlave(
Clock::now(),
checkpointedResources);
- ++metrics.slave_registrations;
+ ++metrics->slave_registrations;
addSlave(slave);
@@ -2976,7 +2983,7 @@ void Master::reregisterSlave(
const vector<Archive::Framework>& completedFrameworks,
const string& version)
{
- ++metrics.messages_reregister_slave;
+ ++metrics->messages_reregister_slave;
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up re-registration request from " << from
@@ -3149,7 +3156,7 @@ void Master::_reregisterSlave(
slave->reregisteredTime = Clock::now();
- ++metrics.slave_reregistrations;
+ ++metrics->slave_reregistrations;
addSlave(slave, completedFrameworks);
@@ -3198,7 +3205,7 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
{
- ++metrics.messages_unregister_slave;
+ ++metrics->messages_unregister_slave;
LOG(INFO) << "Asked to unregister slave " << slaveId;
@@ -3219,7 +3226,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
// because the status updates will be sent by the slave.
void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
{
- ++metrics.messages_status_update;
+ ++metrics->messages_status_update;
if (slaves.removed.get(update.slave_id()).isSome()) {
// If the slave is removed, we have already informed
@@ -3235,7 +3242,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
send(pid, message);
stats.invalidStatusUpdates++;
- metrics.invalid_status_updates++;
+ metrics->invalid_status_updates++;
return;
}
@@ -3246,7 +3253,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
<< " from unknown slave " << pid
<< " with id " << update.slave_id();
stats.invalidStatusUpdates++;
- metrics.invalid_status_updates++;
+ metrics->invalid_status_updates++;
return;
}
@@ -3257,7 +3264,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
<< " from slave " << *slave
<< " because the framework is unknown";
stats.invalidStatusUpdates++;
- metrics.invalid_status_updates++;
+ metrics->invalid_status_updates++;
return;
}
@@ -3272,7 +3279,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
LOG(WARNING) << "Could not lookup task for status update " << update
<< " from slave " << *slave;
stats.invalidStatusUpdates++;
- metrics.invalid_status_updates++;
+ metrics->invalid_status_updates++;
return;
}
@@ -3285,7 +3292,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
}
stats.validStatusUpdates++;
- metrics.valid_status_updates++;
+ metrics->valid_status_updates++;
}
@@ -3319,7 +3326,7 @@ void Master::exitedExecutor(
const ExecutorID& executorId,
int32_t status)
{
- ++metrics.messages_exited_executor;
+ ++metrics->messages_exited_executor;
if (slaves.removed.get(slaveId).isSome()) {
// If the slave is removed, we have already informed
@@ -3394,7 +3401,7 @@ void Master::reconcileTasks(
const FrameworkID& frameworkId,
const std::vector<TaskStatus>& statuses)
{
- ++metrics.messages_reconcile_tasks;
+ ++metrics->messages_reconcile_tasks;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -3735,7 +3742,7 @@ void Master::offer(const FrameworkID& frameworkId,
// 'authenticate' message doesn't contain the 'FrameworkID'.
void Master::authenticate(const UPID& from, const UPID& pid)
{
- ++metrics.messages_authenticate;
+ ++metrics->messages_authenticate;
// An authentication request is sent by a client (slave/framework)
// in the following cases:
@@ -4073,8 +4080,8 @@ void Master::addFramework(Framework* framework)
if (principal.isSome()) {
// Create new framework metrics if this framework is the first
// one of this principal. Otherwise existing metrics are reused.
- if (!metrics.frameworks.contains(principal.get())) {
- metrics.frameworks.put(
+ if (!metrics->frameworks.contains(principal.get())) {
+ metrics->frameworks.put(
principal.get(),
Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
}
@@ -4252,8 +4259,8 @@ void Master::removeFramework(Framework* framework)
// Remove the metrics for the principal if this framework is the
// last one with this principal.
if (!frameworks.principals.containsValue(principal.get())) {
- CHECK(metrics.frameworks.contains(principal.get()));
- metrics.frameworks.erase(principal.get());
+ CHECK(metrics->frameworks.contains(principal.get()));
+ metrics->frameworks.erase(principal.get());
}
}
@@ -4321,7 +4328,7 @@ void Master::addSlave(
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
- slave->pid, slave->info, slave->id, self(), slaves.limiter);
+ slave->pid, slave->info, slave->id, self(), slaves.limiter, metrics);
spawn(slave->observer);
@@ -4496,7 +4503,7 @@ void Master::_removeSlave(
LOG(INFO) << "Removed slave " << slaveInfo.id() << " ("
<< slaveInfo.hostname() << ")";
- ++metrics.slave_removals;
+ ++metrics->slave_removals;
// Forward the LOST updates on to the framework.
foreach (const StatusUpdate& update, updates) {
@@ -4611,10 +4618,10 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
}
switch (task->state()) {
- case TASK_FINISHED: ++metrics.tasks_finished; break;
- case TASK_FAILED: ++metrics.tasks_failed; break;
- case TASK_KILLED: ++metrics.tasks_killed; break;
- case TASK_LOST: ++metrics.tasks_lost; break;
+ case TASK_FINISHED: ++metrics->tasks_finished; break;
+ case TASK_FAILED: ++metrics->tasks_failed; break;
+ case TASK_KILLED: ++metrics->tasks_killed; break;
+ case TASK_LOST: ++metrics->tasks_lost; break;
default: break;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5a5c86f..dcfd38a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -666,7 +666,9 @@ private:
uint64_t invalidFrameworkMessages;
} stats;
- Metrics metrics;
+ // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+ // thread safe.
+ memory::shared_ptr<Metrics> metrics;
// Gauge handlers.
double _uptime_secs()
http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 956fe50..a5cde16 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -148,7 +148,11 @@ Metrics::Metrics(const Master& master)
slave_reregistrations(
"master/slave_reregistrations"),
slave_removals(
- "master/slave_removals")
+ "master/slave_removals"),
+ slave_shutdowns_scheduled(
+ "master/slave_shutdowns_scheduled"),
+ slave_shutdowns_canceled(
+ "master/slave_shutdowns_canceled")
{
// TODO(dhamon): Check return values of 'add'.
process::metrics::add(uptime_secs);
@@ -220,6 +224,9 @@ Metrics::Metrics(const Master& master)
process::metrics::add(slave_reregistrations);
process::metrics::add(slave_removals);
+ process::metrics::add(slave_shutdowns_scheduled);
+ process::metrics::add(slave_shutdowns_canceled);
+
// Create resource gauges.
// TODO(dhamon): Set these up dynamically when adding a slave based on the
// resources the slave exposes.
@@ -319,6 +326,9 @@ Metrics::~Metrics()
process::metrics::remove(slave_reregistrations);
process::metrics::remove(slave_removals);
+ process::metrics::remove(slave_shutdowns_scheduled);
+ process::metrics::remove(slave_shutdowns_canceled);
+
foreach (const process::metrics::Gauge& gauge, resources_total) {
process::metrics::remove(gauge);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 6a43abc..5e18f88 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -156,6 +156,10 @@ struct Metrics
process::metrics::Counter slave_reregistrations;
process::metrics::Counter slave_removals;
+ // Slave observer metrics.
+ process::metrics::Counter slave_shutdowns_scheduled;
+ process::metrics::Counter slave_shutdowns_canceled;
+
// Resource metrics.
std::vector<process::metrics::Gauge> resources_total;
std::vector<process::metrics::Gauge> resources_used;