You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/28 03:07:08 UTC

[1/5] mesos git commit: Added virtual to RateLimiter methods to enable mocking.

Repository: mesos
Updated Branches:
  refs/heads/master bc7a24cf3 -> d55a7239c


Added virtual to RateLimiter methods to enable mocking.

Review: https://reviews.apache.org/r/31511


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1e8f55da
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1e8f55da
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1e8f55da

Branch: refs/heads/master
Commit: 1e8f55daa348500cf2634a2671431d6d0b901059
Parents: bc7a24c
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 26 15:30:42 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:50 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/limiter.hpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1e8f55da/3rdparty/libprocess/include/process/limiter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/limiter.hpp b/3rdparty/libprocess/include/process/limiter.hpp
index 4bf24a8..444aa1b 100644
--- a/3rdparty/libprocess/include/process/limiter.hpp
+++ b/3rdparty/libprocess/include/process/limiter.hpp
@@ -32,11 +32,11 @@ class RateLimiter
 public:
   RateLimiter(int permits, const Duration& duration);
   explicit RateLimiter(double permitsPerSecond);
-  ~RateLimiter();
+  virtual ~RateLimiter();
 
   // Returns a future that becomes ready when the permit is acquired.
   // Discarding this future cancels this acquisition.
-  Future<Nothing> acquire();
+  virtual Future<Nothing> acquire();
 
 private:
   // Not copyable, not assignable.


[5/5] mesos git commit: Added tests for rate limiting slave removals during master failover.

Posted by bm...@apache.org.
Added tests for rate limiting slave removals during master failover.

Review: https://reviews.apache.org/r/31516


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d55a7239
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d55a7239
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d55a7239

Branch: refs/heads/master
Commit: d55a7239cf0707ec291d73e30b69a07f7ee845d3
Parents: 26546a4
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 17:29:14 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:52:40 2015 -0800

----------------------------------------------------------------------
 src/tests/master_tests.cpp | 173 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 173 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d55a7239/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5692f07..3af20e9 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -37,6 +37,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/json.hpp>
+#include <stout/memory.hpp>
 #include <stout/net.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -56,9 +57,12 @@
 #include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
+#include "tests/limiter.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
+using memory::shared_ptr;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::master::allocator::MesosAllocatorProcess;
@@ -72,6 +76,7 @@ using process::Clock;
 using process::Future;
 using process::Owned;
 using process::PID;
+using process::Promise;
 using process::UPID;
 
 using std::string;
@@ -1843,6 +1848,174 @@ TEST_F(MasterTest, NonStrictRegistryWriteOnly)
 }
 
 
+// This test ensures that slave removals during master recovery
+// are rate limited.
+TEST_F(MasterTest, RateLimitRecoveredSlaveRemoval)
+{
+  // Start a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<PID<Master> > master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Stop the slave while the master is down.
+  this->Stop(master.get());
+  this->Stop(slave.get());
+
+  shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+
+  // Return a pending future from the rate limiter.
+  Future<Nothing> acquire;
+  Promise<Nothing> promise;
+  EXPECT_CALL(*slaveRemovalLimiter, acquire())
+    .WillOnce(DoAll(FutureSatisfy(&acquire),
+                    Return(promise.future())));
+
+  // Restart the master.
+  master = StartMaster(slaveRemovalLimiter, masterFlags);
+  ASSERT_SOME(master);
+
+  // Start a scheduler to ensure the master would notify
+  // a framework about slave removal.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillRepeatedly(FutureSatisfy(&slaveLost));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  // Trigger the slave re-registration timeout.
+  Clock::pause();
+  Clock::advance(masterFlags.slave_reregister_timeout);
+
+  // The master should attempt to acquire a permit.
+  AWAIT_READY(acquire);
+
+  // The removal should not occur before the permit is satisfied.
+  Clock::settle();
+  ASSERT_TRUE(slaveLost.isPending());
+
+  // Once the permit is satisfied, the slave should be removed.
+  promise.set(Nothing());
+  AWAIT_READY(slaveLost);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test ensures that slave removals that get scheduled during
+// master recovery can be canceled if the slave re-registers.
+TEST_F(MasterTest, CancelRecoveredSlaveRemoval)
+{
+  // Start a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<PID<Master> > master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+  // Start a slave with checkpointing.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.checkpoint = true;
+  slaveFlags.recover = "reconnect";
+  slaveFlags.strict = true;
+  Try<PID<Slave> > slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Stop the slave while the master is down.
+  this->Stop(master.get());
+  this->Stop(slave.get());
+
+  shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+
+  // Return a pending future from the rate limiter.
+  Future<Nothing> acquire;
+  Promise<Nothing> promise;
+  EXPECT_CALL(*slaveRemovalLimiter, acquire())
+    .WillOnce(DoAll(FutureSatisfy(&acquire),
+                    Return(promise.future())));
+
+  // Restart the master.
+  master = StartMaster(slaveRemovalLimiter, masterFlags);
+  ASSERT_SOME(master);
+
+  // Start a scheduler to ensure the master would notify
+  // a framework about slave removal.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillRepeatedly(FutureSatisfy(&slaveLost));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  // Trigger the slave re-registration timeout.
+  Clock::pause();
+  Clock::advance(masterFlags.slave_reregister_timeout);
+
+  // The master should attempt to acquire a permit.
+  AWAIT_READY(acquire);
+
+  // The removal should not occur before the permit is satisfied.
+  Clock::settle();
+  ASSERT_TRUE(slaveLost.isPending());
+
+  // Ignore resource offers from the re-registered slave.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(Return());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
+  // Restart the slave.
+  slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Satisfy the rate limit permit. Ensure a removal does not occur!
+  promise.set(Nothing());
+  Clock::settle();
+  ASSERT_TRUE(slaveLost.isPending());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
 // This test ensures that when a slave is recovered from the registry
 // and re-registers with the master, it is *not* removed after the
 // re-registration timeout elapses.


[4/5] mesos git commit: Added rate limiting to slave removals during master failover.

Posted by bm...@apache.org.
Added rate limiting to slave removals during master failover.

Review: https://reviews.apache.org/r/31515


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26546a4b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26546a4b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26546a4b

Branch: refs/heads/master
Commit: 26546a4bb8dd64479cd1dfd1d450b37c59e280cc
Parents: 52c64bd
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Feb 24 17:33:55 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:52:39 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 95 ++++++++++++++++++++++++++++++++--------------
 src/master/master.hpp |  4 ++
 2 files changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/26546a4b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a1b428..53c8696 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -220,9 +220,8 @@ protected:
       acquire = limiter.get()->acquire();
     }
 
-    ++metrics->slave_shutdowns_scheduled;
-
     shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+    ++metrics->slave_shutdowns_scheduled;
   }
 
   void _shutdown()
@@ -1227,40 +1226,80 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
             << " investigate or increase this limit to proceed further";
   }
 
+  // Remove the slaves in a rate limited manner, similar to how the
+  // SlaveObserver removes slaves.
   foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
-    if (!slaves.recovered.contains(slave.info().id())) {
-      continue; // Slave re-registered.
+    Future<Nothing> acquire = Nothing();
+
+    if (slaves.limiter.isSome()) {
+      LOG(INFO) << "Scheduling removal of slave "
+                << slave.info().id() << " (" << slave.info().hostname() << ")"
+                << "; did not re-register within "
+                << flags.slave_reregister_timeout << " after master failover";
+
+      acquire = slaves.limiter.get()->acquire();
     }
 
-    LOG(WARNING) << "Slave " << slave.info().id()
-                 << " (" << slave.info().hostname() << ") did not re-register "
-                 << "within the timeout; removing it from the registrar";
+    // Need to disambiguate for the compiler.
+    // TODO(bmahler): With C++11, just call removeSlave from within
+    // a lambda function to avoid the need to disambiguate.
+    Nothing (Master::*removeSlave)(const Registry::Slave&) = &Self::removeSlave;
+    const string failure = "Slave removal rate limit acquisition failed";
 
-    ++metrics->recovery_slave_removals;
+    acquire
+      .then(defer(self(), removeSlave, slave))
+      .onFailed(lambda::bind(fail, failure, lambda::_1))
+      .onDiscarded(lambda::bind(fail, failure, "discarded"));
 
-    slaves.recovered.erase(slave.info().id());
+    ++metrics->slave_shutdowns_scheduled;
+  }
+}
 
-    if (flags.registry_strict) {
-      slaves.removing.insert(slave.info().id());
 
-      registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
-        .onAny(defer(self(),
-                     &Self::_removeSlave,
-                     slave.info(),
-                     vector<StatusUpdate>(), // No TASK_LOST updates to send.
-                     lambda::_1));
-    } else {
-      // When a non-strict registry is in use, we want to ensure the
-      // registry is used in a write-only manner. Therefore we remove
-      // the slave from the registry but we do not inform the
-      // framework.
-      const string& message =
-        "Failed to remove slave " + stringify(slave.info().id());
-
-      registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
-        .onFailed(lambda::bind(fail, message, lambda::_1));
-    }
+Nothing Master::removeSlave(const Registry::Slave& slave)
+{
+  // The slave is removed from 'recovered' when it re-registers.
+  if (!slaves.recovered.contains(slave.info().id())) {
+    LOG(INFO) << "Canceling removal of slave "
+              << slave.info().id() << " (" << slave.info().hostname() << ")"
+              << " since it re-registered!";
+
+    ++metrics->slave_shutdowns_canceled;
+
+    return Nothing();
   }
+
+  LOG(WARNING) << "Slave " << slave.info().id()
+               << " (" << slave.info().hostname() << ") did not re-register"
+               << " within " << flags.slave_reregister_timeout
+               << " after master failover; removing it from the registrar";
+
+  ++metrics->recovery_slave_removals;
+
+  slaves.recovered.erase(slave.info().id());
+
+  if (flags.registry_strict) {
+    slaves.removing.insert(slave.info().id());
+
+    registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+      .onAny(defer(self(),
+                   &Self::_removeSlave,
+                   slave.info(),
+                   vector<StatusUpdate>(), // No TASK_LOST updates to send.
+                   lambda::_1));
+  } else {
+    // When a non-strict registry is in use, we want to ensure the
+    // registry is used in a write-only manner. Therefore we remove
+    // the slave from the registry but we do not inform the
+    // framework.
+    const string& message =
+      "Failed to remove slave " + stringify(slave.info().id());
+
+    registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+      .onFailed(lambda::bind(fail, message, lambda::_1));
+  }
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/26546a4b/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3e8a8dc..ce0e0b3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -362,6 +362,10 @@ protected:
       const std::vector<Archive::Framework>& completedFrameworks =
         std::vector<Archive::Framework>());
 
+  // Remove the slave from the registrar. Called when the slave
+  // does not re-register in time after a master failover.
+  Nothing removeSlave(const Registry::Slave& slave);
+
   // Remove the slave from the registrar and from the master's state.
   void removeSlave(Slave* slave);
   void _removeSlave(


[3/5] mesos git commit: Improved slave removal rate limiting tests.

Posted by bm...@apache.org.
Improved slave removal rate limiting tests.

Review: https://reviews.apache.org/r/31514


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/52c64bdb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/52c64bdb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/52c64bdb

Branch: refs/heads/master
Commit: 52c64bdbf85357b5cec8098da13fa7cf20cbb1cf
Parents: ad09e90
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 26 15:31:56 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:41:41 2015 -0800

----------------------------------------------------------------------
 src/Makefile.am           |   1 +
 src/tests/limiter.hpp     |  46 +++++++++++++++++
 src/tests/mesos.cpp       |  15 ++++++
 src/tests/mesos.hpp       |   6 +++
 src/tests/slave_tests.cpp | 113 ++++++++++++++++++++---------------------
 5 files changed, 122 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 17d0d7a..d299f07 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -550,6 +550,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	tests/flags.hpp							\
 	tests/isolator.hpp						\
 	tests/launcher.hpp						\
+	tests/limiter.hpp						\
 	tests/mesos.hpp							\
 	tests/module.hpp						\
 	tests/script.hpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/limiter.hpp
----------------------------------------------------------------------
diff --git a/src/tests/limiter.hpp b/src/tests/limiter.hpp
new file mode 100644
index 0000000..1bdf8ae
--- /dev/null
+++ b/src/tests/limiter.hpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TEST_LIMITER_HPP__
+#define __TEST_LIMITER_HPP__
+
+#include <gmock/gmock.h>
+
+#include <process/limiter.hpp>
+
+#include <stout/duration.hpp>
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MockRateLimiter : public process::RateLimiter
+{
+public:
+  // We initialize the rate limiter to 1 per second because it has to
+  // be non-zero, but this value has no effect since this is a mock.
+  MockRateLimiter() : process::RateLimiter(1, Seconds(1)) {}
+
+  MOCK_METHOD0(acquire, process::Future<Nothing>());
+};
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __TEST_LIMITER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 23f790c..c8f43d2 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -21,6 +21,7 @@
 #include <stout/check.hpp>
 #include <stout/foreach.hpp>
 #include <stout/json.hpp>
+#include <stout/memory.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/result.hpp>
@@ -47,6 +48,8 @@
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
+using memory::shared_ptr;
+
 using std::string;
 using testing::_;
 using testing::Invoke;
@@ -209,6 +212,18 @@ Try<PID<master::Master> > MesosTest::StartMaster(
 }
 
 
+Try<PID<master::Master>> MesosTest::StartMaster(
+    const shared_ptr<MockRateLimiter>& slaveRemovalLimiter,
+    const Option<master::Flags>& flags)
+{
+  return cluster.masters.start(
+      flags.isNone() ? CreateMasterFlags() : flags.get(),
+      None(),
+      None(),
+      slaveRemovalLimiter);
+}
+
+
 Try<PID<slave::Slave> > MesosTest::StartSlave(
     const Option<slave::Flags>& flags)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index f7a0d05..e91e5e4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -57,6 +57,7 @@
 #include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/cluster.hpp"
+#include "tests/limiter.hpp"
 #include "tests/utils.hpp"
 
 #ifdef MESOS_HAS_JAVA
@@ -105,6 +106,11 @@ protected:
       Authorizer* authorizer,
       const Option<master::Flags>& flags = None());
 
+  // Starts a master with a slave removal rate limiter and flags.
+  virtual Try<process::PID<master::Master> > StartMaster(
+      const memory::shared_ptr<MockRateLimiter>& slaveRemovalLimiter,
+      const Option<master::Flags>& flags = None());
+
   // TODO(bmahler): Consider adding a builder style interface, e.g.
   //
   // Try<PID<Slave> > slave =

http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7ea012a..a74ff7b 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -36,6 +36,7 @@
 #include <process/pid.hpp>
 #include <process/subprocess.hpp>
 
+#include <stout/memory.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/try.hpp>
@@ -56,8 +57,11 @@
 
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
+#include "tests/limiter.hpp"
 #include "tests/mesos.hpp"
 
+using memory::shared_ptr;
+
 using namespace mesos::internal::slave;
 
 using namespace process;
@@ -1302,14 +1306,13 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
 
 
 // This test ensures that when slave removal rate limit is specified
-// a slave that fails health checks is removed after acquiring permit
-// from the rate limiter.
+// a slave that fails health checks is removed after a permit is
+// provided by the rate limiter.
 TEST_F(SlaveTest, RateLimitSlaveShutdown)
 {
   // Start a master.
-  master::Flags flags = CreateMasterFlags();
-  flags.slave_removal_rate_limit = "1/1secs";
-  Try<PID<Master> > master = StartMaster(flags);
+  shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+  Try<PID<Master> > master = StartMaster(slaveRemovalLimiter);
   ASSERT_SOME(master);
 
   // Set these expectations up before we spawn the slave so that we
@@ -1328,31 +1331,39 @@ TEST_F(SlaveTest, RateLimitSlaveShutdown)
 
   AWAIT_READY(slaveRegisteredMessage);
 
-  Future<Nothing> acquire =
-    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+  // Return a pending future from the rate limiter.
+  Future<Nothing> acquire;
+  Promise<Nothing> promise;
+  EXPECT_CALL(*slaveRemovalLimiter, acquire())
+    .WillOnce(DoAll(FutureSatisfy(&acquire),
+                    Return(promise.future())));
 
   Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
 
-  // Now advance through the PINGs.
+  // Induce a health check failure of the slave.
   Clock::pause();
   uint32_t pings = 0;
   while (true) {
     AWAIT_READY(ping);
     pings++;
     if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+      Clock::advance(master::SLAVE_PING_TIMEOUT);
       break;
     }
     ping = FUTURE_MESSAGE(Eq("PING"), _, _);
     Clock::advance(master::SLAVE_PING_TIMEOUT);
   }
 
-  Clock::advance(master::SLAVE_PING_TIMEOUT);
-  Clock::settle();
-
-  // Master should acquire the permit for shutting down the slave.
+  // The master should attempt to acquire a permit.
   AWAIT_READY(acquire);
 
-  // Master should shutdown the slave.
+  // The shutdown should not occur before the permit is satisfied.
+  Clock::settle();
+  ASSERT_TRUE(shutdown.isPending());
+
+  // Once the permit is satisfied, the shutdown message
+  // should be sent.
+  promise.set(Nothing());
   AWAIT_READY(shutdown);
 }
 
@@ -1363,11 +1374,8 @@ TEST_F(SlaveTest, RateLimitSlaveShutdown)
 TEST_F(SlaveTest, CancelSlaveShutdown)
 {
   // Start a master.
-  master::Flags flags = CreateMasterFlags();
-  // Interval between slave removals.
-  Duration interval = master::SLAVE_PING_TIMEOUT * 10;
-  flags.slave_removal_rate_limit = "1/" + stringify(interval);
-  Try<PID<Master> > master = StartMaster(flags);
+  shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+  Try<PID<Master> > master = StartMaster(slaveRemovalLimiter);
   ASSERT_SOME(master);
 
   // Set these expectations up before we spawn the slave so that we
@@ -1377,71 +1385,58 @@ TEST_F(SlaveTest, CancelSlaveShutdown)
   // Drop all the PONGs to simulate health check timeout.
   DROP_MESSAGES(Eq("PONG"), _, _);
 
-  // NOTE: We start two slaves in this test so that the rate limiter
-  // used by the slave observers gives out 2 permits. The first permit
-  // gets satisfied immediately. And since the 2nd permit will be
-  // enqueued, it's corresponding future can be discarded before it
-  // becomes ready.
-  // TODO(vinod): Inject a rate limiter into 'Master' instead to
-  // simplify the test.
-
-  // Start the first slave.
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  Try<PID<Slave> > slave1 = StartSlave();
-  ASSERT_SOME(slave1);
-
-  AWAIT_READY(slaveRegisteredMessage1);
+  // No shutdown should occur during the test!
+  EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
 
-  // Start the second slave.
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<PID<Slave> > slave2 = StartSlave();
-  ASSERT_SOME(slave2);
-
-  AWAIT_READY(slaveRegisteredMessage2);
-
-  Future<Nothing> acquire1 =
-    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
 
-  Future<Nothing> acquire2 =
-    FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+  AWAIT_READY(slaveRegisteredMessage);
 
-  Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+  // Return a pending future from the rate limiter.
+  Future<Nothing> acquire;
+  Promise<Nothing> promise;
+  EXPECT_CALL(*slaveRemovalLimiter, acquire())
+    .WillOnce(DoAll(FutureSatisfy(&acquire),
+                    Return(promise.future())));
 
-  // Now advance through the PINGs until shutdown permits are given
-  // out for both the slaves.
+  // Induce a health check failure of the slave.
   Clock::pause();
+  uint32_t pings = 0;
   while (true) {
     AWAIT_READY(ping);
-    if (acquire1.isReady() && acquire2.isReady()) {
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+      Clock::advance(master::SLAVE_PING_TIMEOUT);
       break;
     }
     ping = FUTURE_MESSAGE(Eq("PING"), _, _);
     Clock::advance(master::SLAVE_PING_TIMEOUT);
   }
-  Clock::settle();
 
-  // The slave that first timed out should be shutdown right away.
-  AWAIT_READY(shutdown);
+  // The master should attempt to acquire a permit.
+  AWAIT_READY(acquire);
 
-  // Ensure the 2nd slave's shutdown is canceled.
-  EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+  // Settle to make sure the shutdown does not occur.
+  Clock::settle();
 
-  // Reset the filters to allow pongs from the 2nd slave.
+  // Reset the filters to allow pongs from the slave.
   filter(NULL);
 
   // Advance clock enough to do a ping pong.
   Clock::advance(master::SLAVE_PING_TIMEOUT);
   Clock::settle();
 
-  // Now advance the clock to the time the 2nd permit is acquired.
-  Clock::advance(interval);
+  // The master should have tried to cancel the removal.
+  ASSERT_TRUE(promise.future().hasDiscard());
 
-  // Settle the clock to give a chance for the master to shutdown
-  // the 2nd slave (it shouldn't in this test).
+  // Allow the cancelation and settle the clock to ensure a shutdown
+  // does not occur.
+  promise.discard();
   Clock::settle();
 }
 


[2/5] mesos git commit: Added ability to inject the RateLimiter for slave removals.

Posted by bm...@apache.org.
Added ability to inject the RateLimiter for slave removals.

Review: https://reviews.apache.org/r/31512


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad09e909
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad09e909
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad09e909

Branch: refs/heads/master
Commit: ad09e9095bcd4015b7ded07cde4e0e6ac8e1948e
Parents: 1e8f55d
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 18:26:19 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:51 2015 -0800

----------------------------------------------------------------------
 src/local/local.cpp   | 41 +++++++++++++++++++++++++++++++++++
 src/master/main.cpp   | 43 +++++++++++++++++++++++++++++++++++++
 src/master/master.cpp | 37 ++++++--------------------------
 src/master/master.hpp |  3 +++
 src/tests/cluster.hpp | 53 +++++++++++++++++++++++++++++++++++++++++++---
 5 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 8189edb..1908336 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -19,15 +19,19 @@
 #include <map>
 #include <set>
 #include <sstream>
+#include <string>
 #include <vector>
 
 #include <mesos/module/anonymous.hpp>
 
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/exit.hpp>
 #include <stout/foreach.hpp>
+#include <stout/memory.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/try.hpp>
@@ -66,6 +70,8 @@
 #include "state/protobuf.hpp"
 #include "state/storage.hpp"
 
+using memory::shared_ptr;
+
 using namespace mesos::internal;
 using namespace mesos::internal::log;
 
@@ -87,6 +93,7 @@ using mesos::modules::ModuleManager;
 
 using process::Owned;
 using process::PID;
+using process::RateLimiter;
 using process::UPID;
 
 using std::map;
@@ -200,6 +207,39 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
       authorizer = authorizer__.release();
     }
 
+    Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+    if (flags.slave_removal_rate_limit.isSome()) {
+      // Parse the flag value.
+      // TODO(vinod): Move this parsing logic to flags once we have a
+      // 'Rate' abstraction in stout.
+      vector<string> tokens =
+        strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+      if (tokens.size() != 2) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>";
+      }
+
+      Try<int> permits = numify<int>(tokens[0]);
+      if (permits.isError()) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>"
+                << ": " << permits.error();
+      }
+
+      Try<Duration> duration = Duration::parse(tokens[1]);
+      if (duration.isError()) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>"
+                << ": " << duration.error();
+      }
+
+      slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+    }
+
     // Create anonymous modules.
     foreach (const string& name, ModuleManager::find<Anonymous>()) {
       Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -224,6 +264,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
         contender,
         detector,
         authorizer,
+        slaveRemovalLimiter,
         flags);
 
     detector->appoint(master->info());

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index f202019..7cce3a0 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -19,17 +19,22 @@
 #include <stdint.h>
 
 #include <set>
+#include <string>
+#include <vector>
 
 #include <mesos/mesos.hpp>
 
 #include <mesos/module/anonymous.hpp>
 
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
 #include <stout/exit.hpp>
 #include <stout/flags.hpp>
+#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -72,12 +77,15 @@ using namespace mesos::internal::log;
 using namespace mesos::internal::master;
 using namespace zookeeper;
 
+using memory::shared_ptr;
+
 using mesos::MasterInfo;
 
 using mesos::modules::Anonymous;
 using mesos::modules::ModuleManager;
 
 using process::Owned;
+using process::RateLimiter;
 using process::UPID;
 
 using std::cerr;
@@ -85,6 +93,7 @@ using std::cout;
 using std::endl;
 using std::set;
 using std::string;
+using std::vector;
 
 
 void usage(const char* argv0, const flags::FlagsBase& flags)
@@ -287,6 +296,39 @@ int main(int argc, char** argv)
     authorizer = authorizer__.release();
   }
 
+  Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+  if (flags.slave_removal_rate_limit.isSome()) {
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    vector<string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+  }
+
   // Create anonymous modules.
   foreach (const string& name, ModuleManager::find<Anonymous>()) {
     Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -314,6 +356,7 @@ int main(int argc, char** argv)
       contender,
       detector,
       authorizer,
+      slaveRemovalLimiter,
       flags);
 
   if (zk.isNone()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 76e217d..4a1b428 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -273,6 +273,7 @@ Master::Master(
     MasterContender* _contender,
     MasterDetector* _detector,
     const Option<Authorizer*>& _authorizer,
+    const Option<shared_ptr<RateLimiter>>& _slaveRemovalLimiter,
     const Flags& _flags)
   : ProcessBase("master"),
     http(this),
@@ -287,6 +288,8 @@ Master::Master(
     metrics(new Metrics(*this)),
     electedTime(None())
 {
+  slaves.limiter = _slaveRemovalLimiter;
+
   // NOTE: We populate 'info_' here instead of inside 'initialize()'
   // because 'StandaloneMasterDetector' needs access to the info.
 
@@ -489,39 +492,11 @@ void Master::initialize()
     LOG(INFO) << "Framework rate limiting enabled";
   }
 
-  if (flags.slave_removal_rate_limit.isSome()) {
+  // If the rate limiter is injected for testing,
+  // the flag may not be set.
+  if (slaves.limiter.isSome() && flags.slave_removal_rate_limit.isSome()) {
     LOG(INFO) << "Slave removal is rate limited to "
               << flags.slave_removal_rate_limit.get();
-
-    // Parse the flag value.
-    // TODO(vinod): Move this parsing logic to flags once we have a
-    // 'Rate' abstraction in stout.
-    vector<string> tokens =
-      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
-
-    if (tokens.size() != 2) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>";
-    }
-
-    Try<int> permits = numify<int>(tokens[0]);
-    if (permits.isError()) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>"
-              << ": " << permits.error();
-    }
-
-    Try<Duration> duration = Duration::parse(tokens[1]);
-    if (duration.isError()) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>"
-              << ": " << duration.error();
-    }
-
-    slaves.limiter = new RateLimiter(permits.get(), duration.get());
   }
 
   hashmap<string, RoleInfo> roleInfos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e288cdb..3e8a8dc 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -34,6 +34,7 @@
 
 #include <mesos/module/authenticator.hpp>
 
+#include <process/limiter.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -104,6 +105,8 @@ public:
          MasterContender* contender,
          MasterDetector* detector,
          const Option<Authorizer*>& authorizer,
+         const Option<memory::shared_ptr<process::RateLimiter>>&
+           slaveRemovalLimiter,
          const Flags& flags = Flags());
 
   virtual ~Master();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index da242d9..a56b654 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -20,6 +20,8 @@
 #define __TESTS_CLUSTER_HPP__
 
 #include <map>
+#include <string>
+#include <vector>
 
 #include <mesos/mesos.hpp>
 
@@ -27,13 +29,16 @@
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/foreach.hpp>
 #include <stout/none.hpp>
+#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/path.hpp>
@@ -101,7 +106,9 @@ public:
     Try<process::PID<master::Master> > start(
         const master::Flags& flags = master::Flags(),
         const Option<master::allocator::Allocator*>& allocator = None(),
-        const Option<Authorizer*>& authorizer = None());
+        const Option<Authorizer*>& authorizer = None(),
+        const Option<memory::shared_ptr<process::RateLimiter> >&
+          slaveRemovalLimiter = None());
 
     // Stops and cleans up a master at the specified PID.
     Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -137,6 +144,8 @@ public:
 
       process::Owned<Authorizer> authorizer;
 
+      Option<memory::shared_ptr<process::RateLimiter>> slaveRemovalLimiter;
+
       master::Master* master;
     };
 
@@ -246,7 +255,8 @@ inline void Cluster::Masters::shutdown()
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
     const master::Flags& flags,
     const Option<master::allocator::Allocator*>& allocator,
-    const Option<Authorizer*>& authorizer)
+    const Option<Authorizer*>& authorizer,
+    const Option<memory::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
 {
   // Disallow multiple masters when not using ZooKeeper.
   if (!masters.empty() && url.isNone()) {
@@ -333,6 +343,40 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
     master.authorizer = authorizer__;
   }
 
+  if (slaveRemovalLimiter.isNone() &&
+      flags.slave_removal_rate_limit.isSome()) {
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    std::vector<std::string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    master.slaveRemovalLimiter = memory::shared_ptr<process::RateLimiter>(
+        new process::RateLimiter(permits.get(), duration.get()));
+  }
+
   master.master = new master::Master(
       master.allocator,
       master.registrar.get(),
@@ -340,7 +384,10 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
       &cluster->files,
       master.contender.get(),
       master.detector.get(),
-      authorizer.isSome() ? authorizer : master.authorizer.get(),
+      authorizer.isSome()
+          ? authorizer : master.authorizer.get(),
+      slaveRemovalLimiter.isSome()
+          ? slaveRemovalLimiter : master.slaveRemovalLimiter,
       flags);
 
   if (url.isNone()) {