You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/28 03:07:09 UTC
[2/5] mesos git commit: Added ability to inject the RateLimiter for
slave removals.
Added ability to inject the RateLimiter for slave removals.
Review: https://reviews.apache.org/r/31512
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad09e909
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad09e909
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad09e909
Branch: refs/heads/master
Commit: ad09e9095bcd4015b7ded07cde4e0e6ac8e1948e
Parents: 1e8f55d
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 18:26:19 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:51 2015 -0800
----------------------------------------------------------------------
src/local/local.cpp | 41 +++++++++++++++++++++++++++++++++++
src/master/main.cpp | 43 +++++++++++++++++++++++++++++++++++++
src/master/master.cpp | 37 ++++++--------------------------
src/master/master.hpp | 3 +++
src/tests/cluster.hpp | 53 +++++++++++++++++++++++++++++++++++++++++++---
5 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 8189edb..1908336 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -19,15 +19,19 @@
#include <map>
#include <set>
#include <sstream>
+#include <string>
#include <vector>
#include <mesos/module/anonymous.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
+#include <stout/memory.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
@@ -66,6 +70,8 @@
#include "state/protobuf.hpp"
#include "state/storage.hpp"
+using memory::shared_ptr;
+
using namespace mesos::internal;
using namespace mesos::internal::log;
@@ -87,6 +93,7 @@ using mesos::modules::ModuleManager;
using process::Owned;
using process::PID;
+using process::RateLimiter;
using process::UPID;
using std::map;
@@ -200,6 +207,39 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
authorizer = authorizer__.release();
}
+ Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+ if (flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+ }
+
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -224,6 +264,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
contender,
detector,
authorizer,
+ slaveRemovalLimiter,
flags);
detector->appoint(master->info());
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index f202019..7cce3a0 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -19,17 +19,22 @@
#include <stdint.h>
#include <set>
+#include <string>
+#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/module/anonymous.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/check.hpp>
+#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
+#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -72,12 +77,15 @@ using namespace mesos::internal::log;
using namespace mesos::internal::master;
using namespace zookeeper;
+using memory::shared_ptr;
+
using mesos::MasterInfo;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
using process::Owned;
+using process::RateLimiter;
using process::UPID;
using std::cerr;
@@ -85,6 +93,7 @@ using std::cout;
using std::endl;
using std::set;
using std::string;
+using std::vector;
void usage(const char* argv0, const flags::FlagsBase& flags)
@@ -287,6 +296,39 @@ int main(int argc, char** argv)
authorizer = authorizer__.release();
}
+ Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+ if (flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ vector<string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+ }
+
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -314,6 +356,7 @@ int main(int argc, char** argv)
contender,
detector,
authorizer,
+ slaveRemovalLimiter,
flags);
if (zk.isNone()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 76e217d..4a1b428 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -273,6 +273,7 @@ Master::Master(
MasterContender* _contender,
MasterDetector* _detector,
const Option<Authorizer*>& _authorizer,
+ const Option<shared_ptr<RateLimiter>>& _slaveRemovalLimiter,
const Flags& _flags)
: ProcessBase("master"),
http(this),
@@ -287,6 +288,8 @@ Master::Master(
metrics(new Metrics(*this)),
electedTime(None())
{
+ slaves.limiter = _slaveRemovalLimiter;
+
// NOTE: We populate 'info_' here instead of inside 'initialize()'
// because 'StandaloneMasterDetector' needs access to the info.
@@ -489,39 +492,11 @@ void Master::initialize()
LOG(INFO) << "Framework rate limiting enabled";
}
- if (flags.slave_removal_rate_limit.isSome()) {
+ // If the rate limiter is injected for testing,
+ // the flag may not be set.
+ if (slaves.limiter.isSome() && flags.slave_removal_rate_limit.isSome()) {
LOG(INFO) << "Slave removal is rate limited to "
<< flags.slave_removal_rate_limit.get();
-
- // Parse the flag value.
- // TODO(vinod): Move this parsing logic to flags once we have a
- // 'Rate' abstraction in stout.
- vector<string> tokens =
- strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
-
- if (tokens.size() != 2) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>";
- }
-
- Try<int> permits = numify<int>(tokens[0]);
- if (permits.isError()) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>"
- << ": " << permits.error();
- }
-
- Try<Duration> duration = Duration::parse(tokens[1]);
- if (duration.isError()) {
- EXIT(1) << "Invalid slave_removal_rate_limit: "
- << flags.slave_removal_rate_limit.get()
- << ". Format is <Number of slaves>/<Duration>"
- << ": " << duration.error();
- }
-
- slaves.limiter = new RateLimiter(permits.get(), duration.get());
}
hashmap<string, RoleInfo> roleInfos;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e288cdb..3e8a8dc 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -34,6 +34,7 @@
#include <mesos/module/authenticator.hpp>
+#include <process/limiter.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
@@ -104,6 +105,8 @@ public:
MasterContender* contender,
MasterDetector* detector,
const Option<Authorizer*>& authorizer,
+ const Option<memory::shared_ptr<process::RateLimiter>>&
+ slaveRemovalLimiter,
const Flags& flags = Flags());
virtual ~Master();
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index da242d9..a56b654 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -20,6 +20,8 @@
#define __TESTS_CLUSTER_HPP__
#include <map>
+#include <string>
+#include <vector>
#include <mesos/mesos.hpp>
@@ -27,13 +29,16 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/none.hpp>
+#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
@@ -101,7 +106,9 @@ public:
Try<process::PID<master::Master> > start(
const master::Flags& flags = master::Flags(),
const Option<master::allocator::Allocator*>& allocator = None(),
- const Option<Authorizer*>& authorizer = None());
+ const Option<Authorizer*>& authorizer = None(),
+ const Option<memory::shared_ptr<process::RateLimiter> >&
+ slaveRemovalLimiter = None());
// Stops and cleans up a master at the specified PID.
Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -137,6 +144,8 @@ public:
process::Owned<Authorizer> authorizer;
+ Option<memory::shared_ptr<process::RateLimiter>> slaveRemovalLimiter;
+
master::Master* master;
};
@@ -246,7 +255,8 @@ inline void Cluster::Masters::shutdown()
inline Try<process::PID<master::Master> > Cluster::Masters::start(
const master::Flags& flags,
const Option<master::allocator::Allocator*>& allocator,
- const Option<Authorizer*>& authorizer)
+ const Option<Authorizer*>& authorizer,
+ const Option<memory::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
{
// Disallow multiple masters when not using ZooKeeper.
if (!masters.empty() && url.isNone()) {
@@ -333,6 +343,40 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
master.authorizer = authorizer__;
}
+ if (slaveRemovalLimiter.isNone() &&
+ flags.slave_removal_rate_limit.isSome()) {
+ // Parse the flag value.
+ // TODO(vinod): Move this parsing logic to flags once we have a
+ // 'Rate' abstraction in stout.
+ std::vector<std::string> tokens =
+ strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+ if (tokens.size() != 2) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>";
+ }
+
+ Try<int> permits = numify<int>(tokens[0]);
+ if (permits.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << permits.error();
+ }
+
+ Try<Duration> duration = Duration::parse(tokens[1]);
+ if (duration.isError()) {
+ EXIT(1) << "Invalid slave_removal_rate_limit: "
+ << flags.slave_removal_rate_limit.get()
+ << ". Format is <Number of slaves>/<Duration>"
+ << ": " << duration.error();
+ }
+
+ master.slaveRemovalLimiter = memory::shared_ptr<process::RateLimiter>(
+ new process::RateLimiter(permits.get(), duration.get()));
+ }
+
master.master = new master::Master(
master.allocator,
master.registrar.get(),
@@ -340,7 +384,10 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
&cluster->files,
master.contender.get(),
master.detector.get(),
- authorizer.isSome() ? authorizer : master.authorizer.get(),
+ authorizer.isSome()
+ ? authorizer : master.authorizer.get(),
+ slaveRemovalLimiter.isSome()
+ ? slaveRemovalLimiter : master.slaveRemovalLimiter,
flags);
if (url.isNone()) {