You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/28 03:07:08 UTC
[1/5] mesos git commit: Added virtual to RateLimiter methods to
enable mocking.
Repository: mesos
Updated Branches:
refs/heads/master bc7a24cf3 -> d55a7239c
Added virtual to RateLimiter methods to enable mocking.
Review: https://reviews.apache.org/r/31511
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1e8f55da
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1e8f55da
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1e8f55da
Branch: refs/heads/master
Commit: 1e8f55daa348500cf2634a2671431d6d0b901059
Parents: bc7a24c
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 26 15:30:42 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:50 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/limiter.hpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e8f55da/3rdparty/libprocess/include/process/limiter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/limiter.hpp b/3rdparty/libprocess/include/process/limiter.hpp
index 4bf24a8..444aa1b 100644
--- a/3rdparty/libprocess/include/process/limiter.hpp
+++ b/3rdparty/libprocess/include/process/limiter.hpp
@@ -32,11 +32,11 @@ class RateLimiter
public:
RateLimiter(int permits, const Duration& duration);
explicit RateLimiter(double permitsPerSecond);
- ~RateLimiter();
+ virtual ~RateLimiter();
// Returns a future that becomes ready when the permit is acquired.
// Discarding this future cancels this acquisition.
- Future<Nothing> acquire();
+ virtual Future<Nothing> acquire();
private:
// Not copyable, not assignable.
[5/5] mesos git commit: Added tests for rate limiting slave removals
during master failover.
Posted by bm...@apache.org.
Added tests for rate limiting slave removals during master failover.
Review: https://reviews.apache.org/r/31516
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d55a7239
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d55a7239
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d55a7239
Branch: refs/heads/master
Commit: d55a7239cf0707ec291d73e30b69a07f7ee845d3
Parents: 26546a4
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 17:29:14 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:52:40 2015 -0800
----------------------------------------------------------------------
src/tests/master_tests.cpp | 173 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 173 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d55a7239/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5692f07..3af20e9 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -37,6 +37,7 @@
#include <process/metrics/metrics.hpp>
#include <stout/json.hpp>
+#include <stout/memory.hpp>
#include <stout/net.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -56,9 +57,12 @@
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
+#include "tests/limiter.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
+using memory::shared_ptr;
+
using mesos::internal::master::Master;
using mesos::internal::master::allocator::MesosAllocatorProcess;
@@ -72,6 +76,7 @@ using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
+using process::Promise;
using process::UPID;
using std::string;
@@ -1843,6 +1848,174 @@ TEST_F(MasterTest, NonStrictRegistryWriteOnly)
}
+// This test ensures that slave removals during master recovery
+// are rate limited.
+TEST_F(MasterTest, RateLimitRecoveredSlaveRemoval)
+{
+ // Start a master.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<PID<Master> > master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+ // Start a slave.
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Stop the slave while the master is down.
+ this->Stop(master.get());
+ this->Stop(slave.get());
+
+ shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+
+ // Return a pending future from the rate limiter.
+ Future<Nothing> acquire;
+ Promise<Nothing> promise;
+ EXPECT_CALL(*slaveRemovalLimiter, acquire())
+ .WillOnce(DoAll(FutureSatisfy(&acquire),
+ Return(promise.future())));
+
+ // Restart the master.
+ master = StartMaster(slaveRemovalLimiter, masterFlags);
+ ASSERT_SOME(master);
+
+ // Start a scheduler to ensure the master would notify
+ // a framework about slave removal.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillRepeatedly(FutureSatisfy(&slaveLost));
+
+ driver.start();
+
+ AWAIT_READY(registered);
+
+ // Trigger the slave re-registration timeout.
+ Clock::pause();
+ Clock::advance(masterFlags.slave_reregister_timeout);
+
+ // The master should attempt to acquire a permit.
+ AWAIT_READY(acquire);
+
+ // The removal should not occur before the permit is satisfied.
+ Clock::settle();
+ ASSERT_TRUE(slaveLost.isPending());
+
+ // Once the permit is satisfied, the slave should be removed.
+ promise.set(Nothing());
+ AWAIT_READY(slaveLost);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test ensures that slave removals that get scheduled during
+// master recovery can be canceled if the slave re-registers.
+TEST_F(MasterTest, CancelRecoveredSlaveRemoval)
+{
+ // Start a master.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<PID<Master> > master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+ // Start a slave with checkpointing.
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.checkpoint = true;
+ slaveFlags.recover = "reconnect";
+ slaveFlags.strict = true;
+ Try<PID<Slave> > slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Stop the slave while the master is down.
+ this->Stop(master.get());
+ this->Stop(slave.get());
+
+ shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+
+ // Return a pending future from the rate limiter.
+ Future<Nothing> acquire;
+ Promise<Nothing> promise;
+ EXPECT_CALL(*slaveRemovalLimiter, acquire())
+ .WillOnce(DoAll(FutureSatisfy(&acquire),
+ Return(promise.future())));
+
+ // Restart the master.
+ master = StartMaster(slaveRemovalLimiter, masterFlags);
+ ASSERT_SOME(master);
+
+ // Start a scheduler to ensure the master would notify
+ // a framework about slave removal.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillRepeatedly(FutureSatisfy(&slaveLost));
+
+ driver.start();
+
+ AWAIT_READY(registered);
+
+ // Trigger the slave re-registration timeout.
+ Clock::pause();
+ Clock::advance(masterFlags.slave_reregister_timeout);
+
+ // The master should attempt to acquire a permit.
+ AWAIT_READY(acquire);
+
+ // The removal should not occur before the permit is satisfied.
+ Clock::settle();
+ ASSERT_TRUE(slaveLost.isPending());
+
+ // Ignore resource offers from the re-registered slave.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(Return());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
+ // Restart the slave.
+ slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Satisfy the rate limit permit. Ensure a removal does not occur!
+ promise.set(Nothing());
+ Clock::settle();
+ ASSERT_TRUE(slaveLost.isPending());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
// This test ensures that when a slave is recovered from the registry
// and re-registers with the master, it is *not* removed after the
// re-registration timeout elapses.
[4/5] mesos git commit: Added rate limiting to slave removals during
master failover.
Posted by bm...@apache.org.
Added rate limiting to slave removals during master failover.
Review: https://reviews.apache.org/r/31515
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26546a4b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26546a4b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26546a4b
Branch: refs/heads/master
Commit: 26546a4bb8dd64479cd1dfd1d450b37c59e280cc
Parents: 52c64bd
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Feb 24 17:33:55 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:52:39 2015 -0800
----------------------------------------------------------------------
src/master/master.cpp | 95 ++++++++++++++++++++++++++++++++--------------
src/master/master.hpp | 4 ++
2 files changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/26546a4b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a1b428..53c8696 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -220,9 +220,8 @@ protected:
acquire = limiter.get()->acquire();
}
- ++metrics->slave_shutdowns_scheduled;
-
shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+ ++metrics->slave_shutdowns_scheduled;
}
void _shutdown()
@@ -1227,40 +1226,80 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
<< " investigate or increase this limit to proceed further";
}
+ // Remove the slaves in a rate limited manner, similar to how the
+ // SlaveObserver removes slaves.
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
- if (!slaves.recovered.contains(slave.info().id())) {
- continue; // Slave re-registered.
+ Future<Nothing> acquire = Nothing();
+
+ if (slaves.limiter.isSome()) {
+ LOG(INFO) << "Scheduling removal of slave "
+ << slave.info().id() << " (" << slave.info().hostname() << ")"
+ << "; did not re-register within "
+ << flags.slave_reregister_timeout << " after master failover";
+
+ acquire = slaves.limiter.get()->acquire();
}
- LOG(WARNING) << "Slave " << slave.info().id()
- << " (" << slave.info().hostname() << ") did not re-register "
- << "within the timeout; removing it from the registrar";
+ // Need to disambiguate for the compiler.
+ // TODO(bmahler): With C++11, just call removeSlave from within
+ // a lambda function to avoid the need to disambiguate.
+ Nothing (Master::*removeSlave)(const Registry::Slave&) = &Self::removeSlave;
+ const string failure = "Slave removal rate limit acquisition failed";
- ++metrics->recovery_slave_removals;
+ acquire
+ .then(defer(self(), removeSlave, slave))
+ .onFailed(lambda::bind(fail, failure, lambda::_1))
+ .onDiscarded(lambda::bind(fail, failure, "discarded"));
- slaves.recovered.erase(slave.info().id());
+ ++metrics->slave_shutdowns_scheduled;
+ }
+}
- if (flags.registry_strict) {
- slaves.removing.insert(slave.info().id());
- registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
- .onAny(defer(self(),
- &Self::_removeSlave,
- slave.info(),
- vector<StatusUpdate>(), // No TASK_LOST updates to send.
- lambda::_1));
- } else {
- // When a non-strict registry is in use, we want to ensure the
- // registry is used in a write-only manner. Therefore we remove
- // the slave from the registry but we do not inform the
- // framework.
- const string& message =
- "Failed to remove slave " + stringify(slave.info().id());
-
- registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
- .onFailed(lambda::bind(fail, message, lambda::_1));
- }
+Nothing Master::removeSlave(const Registry::Slave& slave)
+{
+ // The slave is removed from 'recovered' when it re-registers.
+ if (!slaves.recovered.contains(slave.info().id())) {
+ LOG(INFO) << "Canceling removal of slave "
+ << slave.info().id() << " (" << slave.info().hostname() << ")"
+ << " since it re-registered!";
+
+ ++metrics->slave_shutdowns_canceled;
+
+ return Nothing();
}
+
+ LOG(WARNING) << "Slave " << slave.info().id()
+ << " (" << slave.info().hostname() << ") did not re-register"
+ << " within " << flags.slave_reregister_timeout
+ << " after master failover; removing it from the registrar";
+
+ ++metrics->recovery_slave_removals;
+
+ slaves.recovered.erase(slave.info().id());
+
+ if (flags.registry_strict) {
+ slaves.removing.insert(slave.info().id());
+
+ registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+ .onAny(defer(self(),
+ &Self::_removeSlave,
+ slave.info(),
+ vector<StatusUpdate>(), // No TASK_LOST updates to send.
+ lambda::_1));
+ } else {
+ // When a non-strict registry is in use, we want to ensure the
+ // registry is used in a write-only manner. Therefore we remove
+ // the slave from the registry but we do not inform the
+ // framework.
+ const string& message =
+ "Failed to remove slave " + stringify(slave.info().id());
+
+ registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+ .onFailed(lambda::bind(fail, message, lambda::_1));
+ }
+
+ return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/26546a4b/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3e8a8dc..ce0e0b3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -362,6 +362,10 @@ protected:
const std::vector<Archive::Framework>& completedFrameworks =
std::vector<Archive::Framework>());
+ // Remove the slave from the registrar. Called when the slave
+ // does not re-register in time after a master failover.
+ Nothing removeSlave(const Registry::Slave& slave);
+
// Remove the slave from the registrar and from the master's state.
void removeSlave(Slave* slave);
void _removeSlave(
[3/5] mesos git commit: Improved slave removal rate limiting tests.
Posted by bm...@apache.org.
Improved slave removal rate limiting tests.
Review: https://reviews.apache.org/r/31514
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/52c64bdb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/52c64bdb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/52c64bdb
Branch: refs/heads/master
Commit: 52c64bdbf85357b5cec8098da13fa7cf20cbb1cf
Parents: ad09e90
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 26 15:31:56 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 17:41:41 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/limiter.hpp | 46 +++++++++++++++++
src/tests/mesos.cpp | 15 ++++++
src/tests/mesos.hpp | 6 +++
src/tests/slave_tests.cpp | 113 ++++++++++++++++++++---------------------
5 files changed, 122 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 17d0d7a..d299f07 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -550,6 +550,7 @@ libmesos_no_3rdparty_la_SOURCES += \
tests/flags.hpp \
tests/isolator.hpp \
tests/launcher.hpp \
+ tests/limiter.hpp \
tests/mesos.hpp \
tests/module.hpp \
tests/script.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/limiter.hpp
----------------------------------------------------------------------
diff --git a/src/tests/limiter.hpp b/src/tests/limiter.hpp
new file mode 100644
index 0000000..1bdf8ae
--- /dev/null
+++ b/src/tests/limiter.hpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TEST_LIMITER_HPP__
+#define __TEST_LIMITER_HPP__
+
+#include <gmock/gmock.h>
+
+#include <process/limiter.hpp>
+
+#include <stout/duration.hpp>
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MockRateLimiter : public process::RateLimiter
+{
+public:
+ // We initialize the rate limiter to 1 per second because it has to
+ // be non-zero, but this value has no effect since this is a mock.
+ MockRateLimiter() : process::RateLimiter(1, Seconds(1)) {}
+
+ MOCK_METHOD0(acquire, process::Future<Nothing>());
+};
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __TEST_LIMITER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 23f790c..c8f43d2 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -21,6 +21,7 @@
#include <stout/check.hpp>
#include <stout/foreach.hpp>
#include <stout/json.hpp>
+#include <stout/memory.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/result.hpp>
@@ -47,6 +48,8 @@
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
+using memory::shared_ptr;
+
using std::string;
using testing::_;
using testing::Invoke;
@@ -209,6 +212,18 @@ Try<PID<master::Master> > MesosTest::StartMaster(
}
+Try<PID<master::Master>> MesosTest::StartMaster(
+ const shared_ptr<MockRateLimiter>& slaveRemovalLimiter,
+ const Option<master::Flags>& flags)
+{
+ return cluster.masters.start(
+ flags.isNone() ? CreateMasterFlags() : flags.get(),
+ None(),
+ None(),
+ slaveRemovalLimiter);
+}
+
+
Try<PID<slave::Slave> > MesosTest::StartSlave(
const Option<slave::Flags>& flags)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index f7a0d05..e91e5e4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -57,6 +57,7 @@
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/cluster.hpp"
+#include "tests/limiter.hpp"
#include "tests/utils.hpp"
#ifdef MESOS_HAS_JAVA
@@ -105,6 +106,11 @@ protected:
Authorizer* authorizer,
const Option<master::Flags>& flags = None());
+ // Starts a master with a slave removal rate limiter and flags.
+ virtual Try<process::PID<master::Master> > StartMaster(
+ const memory::shared_ptr<MockRateLimiter>& slaveRemovalLimiter,
+ const Option<master::Flags>& flags = None());
+
// TODO(bmahler): Consider adding a builder style interface, e.g.
//
// Try<PID<Slave> > slave =
http://git-wip-us.apache.org/repos/asf/mesos/blob/52c64bdb/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7ea012a..a74ff7b 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -36,6 +36,7 @@
#include <process/pid.hpp>
#include <process/subprocess.hpp>
+#include <stout/memory.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
@@ -56,8 +57,11 @@
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
+#include "tests/limiter.hpp"
#include "tests/mesos.hpp"
+using memory::shared_ptr;
+
using namespace mesos::internal::slave;
using namespace process;
@@ -1302,14 +1306,13 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
// This test ensures that when slave removal rate limit is specified
-// a slave that fails health checks is removed after acquiring permit
-// from the rate limiter.
+// a slave that fails health checks is removed after a permit is
+// provided by the rate limiter.
TEST_F(SlaveTest, RateLimitSlaveShutdown)
{
// Start a master.
- master::Flags flags = CreateMasterFlags();
- flags.slave_removal_rate_limit = "1/1secs";
- Try<PID<Master> > master = StartMaster(flags);
+ shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+ Try<PID<Master> > master = StartMaster(slaveRemovalLimiter);
ASSERT_SOME(master);
// Set these expectations up before we spawn the slave so that we
@@ -1328,31 +1331,39 @@ TEST_F(SlaveTest, RateLimitSlaveShutdown)
AWAIT_READY(slaveRegisteredMessage);
- Future<Nothing> acquire =
- FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+ // Return a pending future from the rate limiter.
+ Future<Nothing> acquire;
+ Promise<Nothing> promise;
+ EXPECT_CALL(*slaveRemovalLimiter, acquire())
+ .WillOnce(DoAll(FutureSatisfy(&acquire),
+ Return(promise.future())));
Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
- // Now advance through the PINGs.
+ // Induce a health check failure of the slave.
Clock::pause();
uint32_t pings = 0;
while (true) {
AWAIT_READY(ping);
pings++;
if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
break;
}
ping = FUTURE_MESSAGE(Eq("PING"), _, _);
Clock::advance(master::SLAVE_PING_TIMEOUT);
}
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- Clock::settle();
-
- // Master should acquire the permit for shutting down the slave.
+ // The master should attempt to acquire a permit.
AWAIT_READY(acquire);
- // Master should shutdown the slave.
+ // The shutdown should not occur before the permit is satisfied.
+ Clock::settle();
+ ASSERT_TRUE(shutdown.isPending());
+
+ // Once the permit is satisfied, the shutdown message
+ // should be sent.
+ promise.set(Nothing());
AWAIT_READY(shutdown);
}
@@ -1363,11 +1374,8 @@ TEST_F(SlaveTest, RateLimitSlaveShutdown)
TEST_F(SlaveTest, CancelSlaveShutdown)
{
// Start a master.
- master::Flags flags = CreateMasterFlags();
- // Interval between slave removals.
- Duration interval = master::SLAVE_PING_TIMEOUT * 10;
- flags.slave_removal_rate_limit = "1/" + stringify(interval);
- Try<PID<Master> > master = StartMaster(flags);
+ shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter());
+ Try<PID<Master> > master = StartMaster(slaveRemovalLimiter);
ASSERT_SOME(master);
// Set these expectations up before we spawn the slave so that we
@@ -1377,71 +1385,58 @@ TEST_F(SlaveTest, CancelSlaveShutdown)
// Drop all the PONGs to simulate health check timeout.
DROP_MESSAGES(Eq("PONG"), _, _);
- // NOTE: We start two slaves in this test so that the rate limiter
- // used by the slave observers gives out 2 permits. The first permit
- // gets satisfied immediately. And since the 2nd permit will be
- // enqueued, it's corresponding future can be discarded before it
- // becomes ready.
- // TODO(vinod): Inject a rate limiter into 'Master' instead to
- // simplify the test.
-
- // Start the first slave.
- Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- Try<PID<Slave> > slave1 = StartSlave();
- ASSERT_SOME(slave1);
-
- AWAIT_READY(slaveRegisteredMessage1);
+ // No shutdown should occur during the test!
+ EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
- // Start the second slave.
- Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<PID<Slave> > slave2 = StartSlave();
- ASSERT_SOME(slave2);
-
- AWAIT_READY(slaveRegisteredMessage2);
-
- Future<Nothing> acquire1 =
- FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+ // Start a slave.
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
- Future<Nothing> acquire2 =
- FUTURE_DISPATCH(_, &RateLimiterProcess::acquire);
+ AWAIT_READY(slaveRegisteredMessage);
- Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+ // Return a pending future from the rate limiter.
+ Future<Nothing> acquire;
+ Promise<Nothing> promise;
+ EXPECT_CALL(*slaveRemovalLimiter, acquire())
+ .WillOnce(DoAll(FutureSatisfy(&acquire),
+ Return(promise.future())));
- // Now advance through the PINGs until shutdown permits are given
- // out for both the slaves.
+ // Induce a health check failure of the slave.
Clock::pause();
+ uint32_t pings = 0;
while (true) {
AWAIT_READY(ping);
- if (acquire1.isReady() && acquire2.isReady()) {
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
break;
}
ping = FUTURE_MESSAGE(Eq("PING"), _, _);
Clock::advance(master::SLAVE_PING_TIMEOUT);
}
- Clock::settle();
- // The slave that first timed out should be shutdown right away.
- AWAIT_READY(shutdown);
+ // The master should attempt to acquire a permit.
+ AWAIT_READY(acquire);
- // Ensure the 2nd slave's shutdown is canceled.
- EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _);
+ // Settle to make sure the shutdown does not occur.
+ Clock::settle();
- // Reset the filters to allow pongs from the 2nd slave.
+ // Reset the filters to allow pongs from the slave.
filter(NULL);
// Advance clock enough to do a ping pong.
Clock::advance(master::SLAVE_PING_TIMEOUT);
Clock::settle();
- // Now advance the clock to the time the 2nd permit is acquired.
- Clock::advance(interval);
+ // The master should have tried to cancel the removal.
+ ASSERT_TRUE(promise.future().hasDiscard());
- // Settle the clock to give a chance for the master to shutdown
- // the 2nd slave (it shouldn't in this test).
+ // Allow the cancelation and settle the clock to ensure a shutdown
+ // does not occur.
+ promise.discard();
Clock::settle();
}
[2/5] mesos git commit: Added ability to inject the RateLimiter for
slave removals.
Posted by bm...@apache.org.
Added ability to inject the RateLimiter for slave removals.
Review: https://reviews.apache.org/r/31512
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad09e909
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad09e909
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad09e909
Branch: refs/heads/master
Commit: ad09e9095bcd4015b7ded07cde4e0e6ac8e1948e
Parents: 1e8f55d
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 18:26:19 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:51 2015 -0800
----------------------------------------------------------------------
src/local/local.cpp | 41 +++++++++++++++++++++++++++++++++++
src/master/main.cpp | 43 +++++++++++++++++++++++++++++++++++++
src/master/master.cpp | 37 ++++++--------------------------
src/master/master.hpp | 3 +++
src/tests/cluster.hpp | 53 +++++++++++++++++++++++++++++++++++++++++++---
5 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 8189edb..1908336 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -19,15 +19,19 @@
#include <map>
#include <set>
#include <sstream>
+#include <string>
#include <vector>
#include <mesos/module/anonymous.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
+#include <stout/memory.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
@@ -66,6 +70,8 @@
#include "state/protobuf.hpp"
#include "state/storage.hpp"
+using memory::shared_ptr;
+
using namespace mesos::internal;
using namespace mesos::internal::log;
@@ -87,6 +93,7 @@ using mesos::modules::ModuleManager;
using process::Owned;
using process::PID;
+using process::RateLimiter;
using process::UPID;
using std::map;
@@ -200,6 +207,39 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
authorizer = authorizer__.release();
}
+ Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+ if (flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+ }
+
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -224,6 +264,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
contender,
detector,
authorizer,
+ slaveRemovalLimiter,
flags);
detector->appoint(master->info());
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index f202019..7cce3a0 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -19,17 +19,22 @@
#include <stdint.h>
#include <set>
+#include <string>
+#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/module/anonymous.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/check.hpp>
+#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
+#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -72,12 +77,15 @@ using namespace mesos::internal::log;
using namespace mesos::internal::master;
using namespace zookeeper;
+using memory::shared_ptr;
+
using mesos::MasterInfo;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
using process::Owned;
+using process::RateLimiter;
using process::UPID;
using std::cerr;
@@ -85,6 +93,7 @@ using std::cout;
using std::endl;
using std::set;
using std::string;
+using std::vector;
void usage(const char* argv0, const flags::FlagsBase& flags)
@@ -287,6 +296,39 @@ int main(int argc, char** argv)
authorizer = authorizer__.release();
}
+ Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+ if (flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+ }
+
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -314,6 +356,7 @@ int main(int argc, char** argv)
contender,
detector,
authorizer,
+ slaveRemovalLimiter,
flags);
if (zk.isNone()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 76e217d..4a1b428 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -273,6 +273,7 @@ Master::Master(
MasterContender* _contender,
MasterDetector* _detector,
const Option<Authorizer*>& _authorizer,
+ const Option<shared_ptr<RateLimiter>>& _slaveRemovalLimiter,
const Flags& _flags)
: ProcessBase("master"),
http(this),
@@ -287,6 +288,8 @@ Master::Master(
metrics(new Metrics(*this)),
electedTime(None())
{
+ slaves.limiter = _slaveRemovalLimiter;
+
// NOTE: We populate 'info_' here instead of inside 'initialize()'
// because 'StandaloneMasterDetector' needs access to the info.
@@ -489,39 +492,11 @@ void Master::initialize()
LOG(INFO) << "Framework rate limiting enabled";
}
- if (flags.slave_removal_rate_limit.isSome()) {
+ // If the rate limiter is injected for testing,
+ // the flag may not be set.
+ if (slaves.limiter.isSome() && flags.slave_removal_rate_limit.isSome()) {
LOG(INFO) << "Slave removal is rate limited to "
<< flags.slave_removal_rate_limit.get();
-
- // Parse the flag value.
- // TODO(vinod): Move this parsing logic to flags once we have a
- // 'Rate' abstraction in stout.
- vector<string> tokens =
- strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
-
- if (tokens.size() != 2) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>";
- }
-
- Try<int> permits = numify<int>(tokens[0]);
- if (permits.isError()) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>"
- << ": " << permits.error();
- }
-
- Try<Duration> duration = Duration::parse(tokens[1]);
- if (duration.isError()) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>"
- << ": " << duration.error();
- }
-
- slaves.limiter = new RateLimiter(permits.get(), duration.get());
}
hashmap<string, RoleInfo> roleInfos;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e288cdb..3e8a8dc 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -34,6 +34,7 @@
#include <mesos/module/authenticator.hpp>
+#include <process/limiter.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
@@ -104,6 +105,8 @@ public:
MasterContender* contender,
MasterDetector* detector,
const Option<Authorizer*>& authorizer,
+ const Option<memory::shared_ptr<process::RateLimiter>>&
+ slaveRemovalLimiter,
const Flags& flags = Flags());
virtual ~Master();
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index da242d9..a56b654 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -20,6 +20,8 @@
#define __TESTS_CLUSTER_HPP__
#include <map>
+#include <string>
+#include <vector>
#include <mesos/mesos.hpp>
@@ -27,13 +29,16 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/none.hpp>
+#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
@@ -101,7 +106,9 @@ public:
Try<process::PID<master::Master> > start(
const master::Flags& flags = master::Flags(),
const Option<master::allocator::Allocator*>& allocator = None(),
- const Option<Authorizer*>& authorizer = None());
+ const Option<Authorizer*>& authorizer = None(),
+ const Option<memory::shared_ptr<process::RateLimiter> >&
+ slaveRemovalLimiter = None());
// Stops and cleans up a master at the specified PID.
Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -137,6 +144,8 @@ public:
process::Owned<Authorizer> authorizer;
+ Option<memory::shared_ptr<process::RateLimiter>> slaveRemovalLimiter;
+
master::Master* master;
};
@@ -246,7 +255,8 @@ inline void Cluster::Masters::shutdown()
inline Try<process::PID<master::Master> > Cluster::Masters::start(
const master::Flags& flags,
const Option<master::allocator::Allocator*>& allocator,
- const Option<Authorizer*>& authorizer)
+ const Option<Authorizer*>& authorizer,
+ const Option<memory::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
{
// Disallow multiple masters when not using ZooKeeper.
if (!masters.empty() && url.isNone()) {
@@ -333,6 +343,40 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
master.authorizer = authorizer__;
}
+ if (slaveRemovalLimiter.isNone() &&
+ flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ std::vector<std::string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ master.slaveRemovalLimiter = memory::shared_ptr<process::RateLimiter>(
+ new process::RateLimiter(permits.get(), duration.get()));
+ }
+
master.master = new master::Master(
master.allocator,
master.registrar.get(),
@@ -340,7 +384,10 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
&cluster->files,
master.contender.get(),
master.detector.get(),
- authorizer.isSome() ? authorizer : master.authorizer.get(),
+ authorizer.isSome()
+ ? authorizer : master.authorizer.get(),
+ slaveRemovalLimiter.isSome()
+ ? slaveRemovalLimiter : master.slaveRemovalLimiter,
flags);
if (url.isNone()) {