You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/02/05 01:28:07 UTC
[2/3] mesos git commit: Rate limited the removal of slaves failing
health checks.
Rate limited the removal of slaves failing health checks.
Review: https://reviews.apache.org/r/30514
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/886efefc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/886efefc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/886efefc
Branch: refs/heads/master
Commit: 886efefc5f294b3ea22c1fa2ce70a9e9324eca19
Parents: fafccbd
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 11:51:02 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800
----------------------------------------------------------------------
src/master/flags.hpp | 10 ++
src/master/master.cpp | 104 +++++++++++++++++--
src/master/master.hpp | 6 ++
src/tests/partition_tests.cpp | 2 +-
src/tests/slave_tests.cpp | 204 +++++++++++++++++++++++++++++++------
5 files changed, 287 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 6c18a1a..e9f6fff 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -147,6 +147,15 @@ public:
"Values: [0%-100%]",
stringify(RECOVERY_SLAVE_REMOVAL_PERCENT_LIMIT * 100.0) + "%");
+ // TODO(vinod): Add a 'Rate' abstraction in stout and the
+ // corresponding parser for flags.
+ add(&Flags::slave_removal_rate_limit,
+ "slave_removal_rate_limit",
+ "The maximum rate (e.g., 1/10mins, 2/3hrs, etc) at which slaves will\n"
+ "be removed from the master when they fail health checks. By default\n"
+ "slaves will be removed as soon as they fail the health checks.\n"
+ "The value is of the form <Number of slaves>/<Duration>.");
+
add(&Flags::webui_dir,
"webui_dir",
"Directory path of the webui files/assets",
@@ -377,6 +386,7 @@ public:
bool log_auto_initialize;
Duration slave_reregister_timeout;
std::string recovery_slave_removal_limit;
+ Option<std::string> slave_removal_rate_limit;
std::string webui_dir;
std::string whitelist;
std::string user_sorter;
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d5c4beb..e42b922 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -94,6 +94,7 @@ using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
+using process::RateLimiter;
using process::Shared;
using process::Time;
using process::Timer;
@@ -116,12 +117,14 @@ public:
SlaveObserver(const UPID& _slave,
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
- const PID<Master>& _master)
+ const PID<Master>& _master,
+ const Option<shared_ptr<RateLimiter>>& _limiter)
: ProcessBase(process::ID::generate("slave-observer")),
slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
master(_master),
+ limiter(_limiter),
timeouts(0),
pinged(false),
connected(true)
@@ -167,23 +170,75 @@ protected:
{
timeouts = 0;
pinged = false;
+
+ // Cancel any pending shutdown.
+ if (shuttingDown.isSome()) {
+ // Need a copy for non-const access.
+ Future<Nothing> future = shuttingDown.get();
+ future.discard();
+ }
}
void timeout()
{
- if (pinged) { // So we haven't got back a pong yet ...
- if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+ if (pinged) {
+ timeouts++; // No pong has been received before the timeout.
+ if (timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+ // No pong has been received for the last
+ // 'MAX_SLAVE_PING_TIMEOUTS' pings.
shutdown();
- return;
}
}
+ // NOTE: We keep pinging even if we schedule a shutdown. This is
+ // because if the slave eventually responds to a ping, we can
+ // cancel the shutdown.
ping();
}
+ // NOTE: The shutdown of the slave is rate limited and can be
+ // canceled if a pong was received before the actual shutdown is
+ // called.
void shutdown()
{
- dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out");
+ if (shuttingDown.isSome()) {
+ return; // Shutdown is already in progress.
+ }
+
+ Future<Nothing> acquire = Nothing();
+
+ if (limiter.isSome()) {
+ LOG(INFO) << "Scheduling shutdown of slave " << slaveId
+ << " due to health check timeout";
+
+ acquire = limiter.get()->acquire();
+ }
+
+ shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+ }
+
+ void _shutdown()
+ {
+ CHECK_SOME(shuttingDown);
+
+ const Future<Nothing>& future = shuttingDown.get();
+
+ CHECK(!future.isFailed());
+
+ if (future.isReady()) {
+ LOG(INFO) << "Shutting down slave " << slaveId
+ << " due to health check timeout";
+
+ dispatch(master,
+ &Master::shutdownSlave,
+ slaveId,
+ "health check timed out");
+ } else if (future.isDiscarded()) {
+ LOG(INFO) << "Canceling shutdown of slave " << slaveId
+ << " since a pong is received!";
+ }
+
+ shuttingDown = None();
}
private:
@@ -191,6 +246,8 @@ private:
const SlaveInfo slaveInfo;
const SlaveID slaveId;
const PID<Master> master;
+ const Option<shared_ptr<RateLimiter>> limiter;
+ Option<Future<Nothing>> shuttingDown;
uint32_t timeouts;
bool pinged;
bool connected;
@@ -423,6 +480,41 @@ void Master::initialize()
LOG(INFO) << "Framework rate limiting enabled";
}
+ if (flags.slave_removal_rate_limit.isSome()) {
+ LOG(INFO) << "Slave removal is rate limited to "
+ << flags.slave_removal_rate_limit.get();
+
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaves.limiter = new RateLimiter(permits.get(), duration.get());
+ }
+
hashmap<string, RoleInfo> roleInfos;
// Add the default role.
@@ -4229,7 +4321,7 @@ void Master::addSlave(
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
- slave->pid, slave->info, slave->id, self());
+ slave->pid, slave->info, slave->id, self(), slaves.limiter);
spawn(slave->observer);
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d98e1b6..5a5c86f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -585,6 +585,12 @@ private:
// TODO(bmahler): Ideally we could use a cache with set semantics.
Cache<SlaveID, Nothing> removed;
+ // This rate limiter is used to limit the removal of slaves failing
+ // health checks.
+ // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+ // a wrapper around libprocess process which is thread safe.
+ Option<memory::shared_ptr<process::RateLimiter>> limiter;
+
bool transitioning(const Option<SlaveID>& slaveId)
{
if (slaveId.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index fea7801..b3af282 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -64,7 +64,7 @@ class PartitionTest : public MesosTest {};
// This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
+// message for a partitioned slave.
TEST_F(PartitionTest, PartitionedSlave)
{
Try<PID<Master> > master = StartMaster();
http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e7e2af6..956ce64 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -64,6 +64,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using namespace process;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
@@ -73,12 +75,6 @@ using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
using mesos::internal::slave::Slave;
-using process::Clock;
-using process::Future;
-using process::Message;
-using process::Owned;
-using process::PID;
-
using std::map;
using std::string;
using std::vector;
@@ -151,7 +147,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
- Future<process::Message> registerExecutor =
+ Future<Message> registerExecutor =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
@@ -234,7 +230,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
- Future<process::Message> registerExecutorMessage =
+ Future<Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
@@ -322,7 +318,7 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
// Expect wait after launch is called but don't return anything
// until after we've finished everything below.
Future<Nothing> wait;
- process::Promise<containerizer::Termination> promise;
+ Promise<containerizer::Termination> promise;
EXPECT_CALL(containerizer, wait(_))
.WillOnce(DoAll(FutureSatisfy(&wait),
Return(promise.future())));
@@ -359,12 +355,12 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
- Try<process::Subprocess> executor =
- process::subprocess(
+ Try<Subprocess> executor =
+ subprocess(
executorCommand,
- process::Subprocess::PIPE(),
- process::Subprocess::PIPE(),
- process::Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
environment);
ASSERT_SOME(executor);
@@ -728,13 +724,13 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// We need to grab this message to get the scheduler's pid.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
schedDriver.start();
AWAIT_READY(frameworkRegisteredMessage);
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+ const UPID schedulerPid = frameworkRegisteredMessage.get().to;
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
@@ -776,10 +772,9 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
AWAIT_READY(acknowledgementMessage);
// Send the acknowledgement to the slave with a non-leading master.
- process::post(
- process::UPID("master@localhost:1"),
- slave.get(),
- acknowledgementMessage.get());
+ post(process::UPID("master@localhost:1"),
+ slave.get(),
+ acknowledgementMessage.get());
// Make sure the acknowledgement was ignored.
Clock::settle();
@@ -821,8 +816,8 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint)
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
- Future<process::http::Response> response =
- process::http::get(slave.get(), "stats.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "stats.json");
AWAIT_READY(response);
@@ -891,8 +886,8 @@ TEST_F(SlaveTest, StateEndpoint)
Try<PID<Slave> > slave = StartSlave(flags);
ASSERT_SOME(slave);
- Future<process::http::Response> response =
- process::http::get(slave.get(), "state.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "state.json");
AWAIT_READY(response);
@@ -1009,7 +1004,7 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
// Send a ShutdownMessage instead of calling Stop() directly
// to avoid blocking.
- process::post(master.get(), slave.get(), ShutdownMessage());
+ post(master.get(), slave.get(), ShutdownMessage());
// Advance the clock to trigger doReliableRegistration().
Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2);
@@ -1096,7 +1091,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
// Set up the containerizer so the next update() will fail.
EXPECT_CALL(containerizer, update(_, _))
- .WillOnce(Return(process::Failure("update() failed")))
+ .WillOnce(Return(Failure("update() failed")))
.WillRepeatedly(Return(Nothing()));
EXPECT_CALL(exec, killTask(_, _))
@@ -1201,6 +1196,151 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
}
+// This test ensures that when slave removal rate limit is specified
+// a slave that fails health checks is removed after acquiring permit
+// from the rate limiter.
+TEST_F(SlaveTest, RateLimitSlaveShutdown)
+{
+ // Start a master.
+ master::Flags flags = CreateMasterFlags();
+ flags.slave_removal_rate_limit = "1/1secs";
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Set these expectations up before we spawn the slave so that we
+ // don't miss the first PING.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+ // Drop all the PONGs to simulate health check timeout.
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Start a slave.
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ Future<Nothing> acquire =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+ // Now advance through the PINGs.
+ Clock::pause();
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // Master should acquire the permit for shutting down the slave.
+ AWAIT_READY(acquire);
+
+ // Master should shutdown the slave.
+ AWAIT_READY(shutdown);
+}
+
+
+// This test verifies that when a slave responds to pings after the
+// slave observer has scheduled it for shutdown (due to health check
+// failure), the shutdown is cancelled.
+TEST_F(SlaveTest, CancelSlaveShutdown)
+{
+ // Start a master.
+ master::Flags flags = CreateMasterFlags();
+ // Interval between slave removals.
+ Duration interval = master::SLAVE_PING_TIMEOUT * 10;
+ flags.slave_removal_rate_limit = "1/" + stringify(interval);
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Set these expectations up before we spawn the slave so that we
+ // don't miss the first PING.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+ // Drop all the PONGs to simulate health check timeout.
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ // NOTE: We start two slaves in this test so that the rate limiter
+ // used by the slave observers gives out 2 permits. The first permit
+ // gets satisfied immediately. And since the 2nd permit will be
+ // enqueued, it's corresponding future can be discarded before it
+ // becomes ready.
+ // TODO(vinod): Inject a rate limiter into 'Master' instead to
+ // simplify the test.
+
+ // Start the first slave.
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<PID<Slave> > slave1 = StartSlave();
+ ASSERT_SOME(slave1);
+
+ AWAIT_READY(slaveRegisteredMessage1);
+
+ // Start the second slave.
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<PID<Slave> > slave2 = StartSlave();
+ ASSERT_SOME(slave2);
+
+ AWAIT_READY(slaveRegisteredMessage2);
+
+ Future<Nothing> acquire1 =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<Nothing> acquire2 =
+ FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+ Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+ // Now advance through the PINGs until shutdown permits are given
+ // out for both the slaves.
+ Clock::pause();
+ while (true) {
+ AWAIT_READY(ping);
+ if (acquire1.isReady() && acquire2.isReady()) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+ Clock::settle();
+
+ // The slave that first timed out should be shutdown right away.
+ AWAIT_READY(shutdown);
+
+ // Ensure the 2nd slave's shutdown is canceled.
+ EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+
+ // Reset the filters to allow pongs from the 2nd slave.
+ filter(NULL);
+
+ // Advance clock enough to do a ping pong.
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // Now advance the clock to the time the 2nd permit is acquired.
+ Clock::advance(interval);
+
+ // Settle the clock to give a chance for the master to shutdown
+ // the 2nd slave (it shouldn't in this test).
+ Clock::settle();
+}
+
+
// This test ensures that a killTask() can happen between runTask()
// and _runTask() and then gets "handled properly". This means that
// the task never gets started, but also does not get lost. The end
@@ -1220,7 +1360,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
StandaloneMasterDetector detector(master.get());
MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
- process::spawn(slave);
+ spawn(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
@@ -1311,8 +1451,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
driver.stop();
driver.join();
- process::terminate(slave);
- process::wait(slave);
+ terminate(slave);
+ wait(slave);
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
@@ -1497,8 +1637,8 @@ TEST_F(SlaveTest, TaskLabels)
AWAIT_READY(update);
// Verify label key and value in slave state.json.
- Future<process::http::Response> response =
- process::http::get(slave.get(), "state.json");
+ Future<http::Response> response =
+ http::get(slave.get(), "state.json");
AWAIT_READY(response);
EXPECT_SOME_EQ(
@@ -1685,7 +1825,7 @@ TEST_F(SlaveTest, CommandExecutorGracefulShutdown)
// Ensure that a reap will occur within the grace period.
Duration timeout = slave::getExecutorGracePeriod(
flags.executor_shutdown_grace_period);
- EXPECT_GT(timeout, process::MAX_REAP_INTERVAL());
+ EXPECT_GT(timeout, MAX_REAP_INTERVAL());
Fetcher fetcher;
Try<MesosContainerizer*> containerizer = MesosContainerizer::create(