You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/02/05 01:28:07 UTC

[2/3] mesos git commit: Rate limited the removal of slaves failing health checks.

Rate limited the removal of slaves failing health checks.

Review: https://reviews.apache.org/r/30514


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/886efefc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/886efefc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/886efefc

Branch: refs/heads/master
Commit: 886efefc5f294b3ea22c1fa2ce70a9e9324eca19
Parents: fafccbd
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jan 29 11:51:02 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Feb 4 16:27:49 2015 -0800

----------------------------------------------------------------------
 src/master/flags.hpp          |  10 ++
 src/master/master.cpp         | 104 +++++++++++++++++--
 src/master/master.hpp         |   6 ++
 src/tests/partition_tests.cpp |   2 +-
 src/tests/slave_tests.cpp     | 204 +++++++++++++++++++++++++++++++------
 5 files changed, 287 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 6c18a1a..e9f6fff 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -147,6 +147,15 @@ public:
         "Values: [0%-100%]",
         stringify(RECOVERY_SLAVE_REMOVAL_PERCENT_LIMIT * 100.0) + "%");
 
+    // TODO(vinod): Add a 'Rate' abstraction in stout and the
+    // corresponding parser for flags.
+    add(&Flags::slave_removal_rate_limit,
+        "slave_removal_rate_limit",
+        "The maximum rate (e.g., 1/10mins, 2/3hrs, etc) at which slaves will\n"
+        "be removed from the master when they fail health checks. By default\n"
+        "slaves will be removed as soon as they fail the health checks.\n"
+        "The value is of the form <Number of slaves>/<Duration>.");
+
     add(&Flags::webui_dir,
         "webui_dir",
         "Directory path of the webui files/assets",
@@ -377,6 +386,7 @@ public:
   bool log_auto_initialize;
   Duration slave_reregister_timeout;
   std::string recovery_slave_removal_limit;
+  Option<std::string> slave_removal_rate_limit;
   std::string webui_dir;
   std::string whitelist;
   std::string user_sorter;

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d5c4beb..e42b922 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -94,6 +94,7 @@ using process::Owned;
 using process::PID;
 using process::Process;
 using process::Promise;
+using process::RateLimiter;
 using process::Shared;
 using process::Time;
 using process::Timer;
@@ -116,12 +117,14 @@ public:
   SlaveObserver(const UPID& _slave,
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
-                const PID<Master>& _master)
+                const PID<Master>& _master,
+                const Option<shared_ptr<RateLimiter>>& _limiter)
     : ProcessBase(process::ID::generate("slave-observer")),
       slave(_slave),
       slaveInfo(_slaveInfo),
       slaveId(_slaveId),
       master(_master),
+      limiter(_limiter),
       timeouts(0),
       pinged(false),
       connected(true)
@@ -167,23 +170,75 @@ protected:
   {
     timeouts = 0;
     pinged = false;
+
+    // Cancel any pending shutdown.
+    if (shuttingDown.isSome()) {
+      // Need a copy for non-const access.
+      Future<Nothing> future = shuttingDown.get();
+      future.discard();
+    }
   }
 
   void timeout()
   {
-    if (pinged) { // So we haven't got back a pong yet ...
-      if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+    if (pinged) {
+      timeouts++; // No pong has been received before the timeout.
+      if (timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
+        // No pong has been received for the last
+        // 'MAX_SLAVE_PING_TIMEOUTS' pings.
         shutdown();
-        return;
       }
     }
 
+    // NOTE: We keep pinging even if we schedule a shutdown. This is
+    // because if the slave eventually responds to a ping, we can
+    // cancel the shutdown.
     ping();
   }
 
+  // NOTE: The shutdown of the slave is rate limited and can be
+  // canceled if a pong was received before the actual shutdown is
+  // called.
   void shutdown()
   {
-    dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out");
+    if (shuttingDown.isSome()) {
+      return;  // Shutdown is already in progress.
+    }
+
+    Future<Nothing> acquire = Nothing();
+
+    if (limiter.isSome()) {
+      LOG(INFO) << "Scheduling shutdown of slave " << slaveId
+                << " due to health check timeout";
+
+      acquire = limiter.get()->acquire();
+    }
+
+    shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+  }
+
+  void _shutdown()
+  {
+    CHECK_SOME(shuttingDown);
+
+    const Future<Nothing>& future = shuttingDown.get();
+
+    CHECK(!future.isFailed());
+
+    if (future.isReady()) {
+      LOG(INFO) << "Shutting down slave " << slaveId
+                << " due to health check timeout";
+
+      dispatch(master,
+               &Master::shutdownSlave,
+               slaveId,
+               "health check timed out");
+    } else if (future.isDiscarded()) {
+      LOG(INFO) << "Canceling shutdown of slave " << slaveId
+                << " since a pong is received!";
+    }
+
+    shuttingDown = None();
   }
 
 private:
@@ -191,6 +246,8 @@ private:
   const SlaveInfo slaveInfo;
   const SlaveID slaveId;
   const PID<Master> master;
+  const Option<shared_ptr<RateLimiter>> limiter;
+  Option<Future<Nothing>> shuttingDown;
   uint32_t timeouts;
   bool pinged;
   bool connected;
@@ -423,6 +480,41 @@ void Master::initialize()
     LOG(INFO) << "Framework rate limiting enabled";
   }
 
+  if (flags.slave_removal_rate_limit.isSome()) {
+    LOG(INFO) << "Slave removal is rate limited to "
+              << flags.slave_removal_rate_limit.get();
+
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    vector<string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    slaves.limiter = new RateLimiter(permits.get(), duration.get());
+  }
+
   hashmap<string, RoleInfo> roleInfos;
 
   // Add the default role.
@@ -4229,7 +4321,7 @@ void Master::addSlave(
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
-      slave->pid, slave->info, slave->id, self());
+      slave->pid, slave->info, slave->id, self(), slaves.limiter);
 
   spawn(slave->observer);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d98e1b6..5a5c86f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -585,6 +585,12 @@ private:
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     Cache<SlaveID, Nothing> removed;
 
+    // This rate limiter is used to limit the removal of slaves failing
+    // health checks.
+    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+    // a wrapper around libprocess process which is thread safe.
+    Option<memory::shared_ptr<process::RateLimiter>> limiter;
+
     bool transitioning(const Option<SlaveID>& slaveId)
     {
       if (slaveId.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index fea7801..b3af282 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -64,7 +64,7 @@ class PartitionTest : public MesosTest {};
 
 
 // This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
+// message for a partitioned slave.
 TEST_F(PartitionTest, PartitionedSlave)
 {
   Try<PID<Master> > master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e7e2af6..956ce64 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -64,6 +64,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using namespace process;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Containerizer;
@@ -73,12 +75,6 @@ using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
 using mesos::internal::slave::Slave;
 
-using process::Clock;
-using process::Future;
-using process::Message;
-using process::Owned;
-using process::PID;
-
 using std::map;
 using std::string;
 using std::vector;
@@ -151,7 +147,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
   tasks.push_back(task);
 
   // Drop the registration message from the executor to the slave.
-  Future<process::Message> registerExecutor =
+  Future<Message> registerExecutor =
     DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
 
   driver.launchTasks(offers.get()[0].id(), tasks);
@@ -234,7 +230,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
   tasks.push_back(task);
 
   // Drop the registration message from the executor to the slave.
-  Future<process::Message> registerExecutorMessage =
+  Future<Message> registerExecutorMessage =
     DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
 
   driver.launchTasks(offers.get()[0].id(), tasks);
@@ -322,7 +318,7 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
   // Expect wait after launch is called but don't return anything
   // until after we've finished everything below.
   Future<Nothing> wait;
-  process::Promise<containerizer::Termination> promise;
+  Promise<containerizer::Termination> promise;
   EXPECT_CALL(containerizer, wait(_))
     .WillOnce(DoAll(FutureSatisfy(&wait),
                     Return(promise.future())));
@@ -359,12 +355,12 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  Try<process::Subprocess> executor =
-    process::subprocess(
+  Try<Subprocess> executor =
+    subprocess(
         executorCommand,
-        process::Subprocess::PIPE(),
-        process::Subprocess::PIPE(),
-        process::Subprocess::PIPE(),
+        Subprocess::PIPE(),
+        Subprocess::PIPE(),
+        Subprocess::PIPE(),
         environment);
 
   ASSERT_SOME(executor);
@@ -728,13 +724,13 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // We need to grab this message to get the scheduler's pid.
-  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+  Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
       Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
   schedDriver.start();
 
   AWAIT_READY(frameworkRegisteredMessage);
-  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+  const UPID schedulerPid = frameworkRegisteredMessage.get().to;
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
@@ -776,10 +772,9 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
   AWAIT_READY(acknowledgementMessage);
 
   // Send the acknowledgement to the slave with a non-leading master.
-  process::post(
-      process::UPID("master@localhost:1"),
-      slave.get(),
-      acknowledgementMessage.get());
+  post(process::UPID("master@localhost:1"),
+       slave.get(),
+       acknowledgementMessage.get());
 
   // Make sure the acknowledgement was ignored.
   Clock::settle();
@@ -821,8 +816,8 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint)
   Try<PID<Slave> > slave = StartSlave();
   ASSERT_SOME(slave);
 
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "stats.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "stats.json");
 
   AWAIT_READY(response);
 
@@ -891,8 +886,8 @@ TEST_F(SlaveTest, StateEndpoint)
   Try<PID<Slave> > slave = StartSlave(flags);
   ASSERT_SOME(slave);
 
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "state.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "state.json");
 
   AWAIT_READY(response);
 
@@ -1009,7 +1004,7 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
 
   // Send a ShutdownMessage instead of calling Stop() directly
   // to avoid blocking.
-  process::post(master.get(), slave.get(), ShutdownMessage());
+  post(master.get(), slave.get(), ShutdownMessage());
 
   // Advance the clock to trigger doReliableRegistration().
   Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2);
@@ -1096,7 +1091,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
 
   // Set up the containerizer so the next update() will fail.
   EXPECT_CALL(containerizer, update(_, _))
-    .WillOnce(Return(process::Failure("update() failed")))
+    .WillOnce(Return(Failure("update() failed")))
     .WillRepeatedly(Return(Nothing()));
 
   EXPECT_CALL(exec, killTask(_, _))
@@ -1201,6 +1196,151 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
 }
 
 
+// This test ensures that when slave removal rate limit is specified
+// a slave that fails health checks is removed after acquiring permit
+// from the rate limiter.
+TEST_F(SlaveTest, RateLimitSlaveShutdown)
+{
+  // Start a master.
+  master::Flags flags = CreateMasterFlags();
+  flags.slave_removal_rate_limit = "1/1secs";
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Drop all the PONGs to simulate health check timeout.
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Future<Nothing> acquire =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  // Now advance through the PINGs.
+  Clock::pause();
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // Master should acquire the permit for shutting down the slave.
+  AWAIT_READY(acquire);
+
+  // Master should shutdown the slave.
+  AWAIT_READY(shutdown);
+}
+
+
+// This test verifies that when a slave responds to pings after the
+// slave observer has scheduled it for shutdown (due to health check
+// failure), the shutdown is cancelled.
+TEST_F(SlaveTest, CancelSlaveShutdown)
+{
+  // Start a master.
+  master::Flags flags = CreateMasterFlags();
+  // Interval between slave removals.
+  Duration interval = master::SLAVE_PING_TIMEOUT * 10;
+  flags.slave_removal_rate_limit = "1/" + stringify(interval);
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Drop all the PONGs to simulate health check timeout.
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  // NOTE: We start two slaves in this test so that the rate limiter
+  // used by the slave observers gives out 2 permits. The first permit
+  // gets satisfied immediately. And since the 2nd permit will be
+  // enqueued, it's corresponding future can be discarded before it
+  // becomes ready.
+  // TODO(vinod): Inject a rate limiter into 'Master' instead to
+  // simplify the test.
+
+  // Start the first slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<PID<Slave> > slave1 = StartSlave();
+  ASSERT_SOME(slave1);
+
+  AWAIT_READY(slaveRegisteredMessage1);
+
+  // Start the second slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<PID<Slave> > slave2 = StartSlave();
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(slaveRegisteredMessage2);
+
+  Future<Nothing> acquire1 =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<Nothing> acquire2 =
+    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+
+  Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  // Now advance through the PINGs until shutdown permits are given
+  // out for both the slaves.
+  Clock::pause();
+  while (true) {
+    AWAIT_READY(ping);
+    if (acquire1.isReady() && acquire2.isReady()) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+  Clock::settle();
+
+  // The slave that first timed out should be shutdown right away.
+  AWAIT_READY(shutdown);
+
+  // Ensure the 2nd slave's shutdown is canceled.
+  EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+
+  // Reset the filters to allow pongs from the 2nd slave.
+  filter(NULL);
+
+  // Advance clock enough to do a ping pong.
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // Now advance the clock to the time the 2nd permit is acquired.
+  Clock::advance(interval);
+
+  // Settle the clock to give a chance for the master to shutdown
+  // the 2nd slave (it shouldn't in this test).
+  Clock::settle();
+}
+
+
 // This test ensures that a killTask() can happen between runTask()
 // and _runTask() and then gets "handled properly". This means that
 // the task never gets started, but also does not get lost. The end
@@ -1220,7 +1360,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   StandaloneMasterDetector detector(master.get());
 
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
-  process::spawn(slave);
+  spawn(slave);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
@@ -1311,8 +1451,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   driver.stop();
   driver.join();
 
-  process::terminate(slave);
-  process::wait(slave);
+  terminate(slave);
+  wait(slave);
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
@@ -1497,8 +1637,8 @@ TEST_F(SlaveTest, TaskLabels)
   AWAIT_READY(update);
 
   // Verify label key and value in slave state.json.
-  Future<process::http::Response> response =
-    process::http::get(slave.get(), "state.json");
+  Future<http::Response> response =
+    http::get(slave.get(), "state.json");
   AWAIT_READY(response);
 
   EXPECT_SOME_EQ(
@@ -1685,7 +1825,7 @@ TEST_F(SlaveTest, CommandExecutorGracefulShutdown)
   // Ensure that a reap will occur within the grace period.
   Duration timeout = slave::getExecutorGracePeriod(
       flags.executor_shutdown_grace_period);
-  EXPECT_GT(timeout, process::MAX_REAP_INTERVAL());
+  EXPECT_GT(timeout, MAX_REAP_INTERVAL());
 
   Fetcher fetcher;
   Try<MesosContainerizer*> containerizer = MesosContainerizer::create(