You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/28 03:07:09 UTC

[2/5] mesos git commit: Added ability to inject the RateLimiter for slave removals.

Added ability to inject the RateLimiter for slave removals.

Review: https://reviews.apache.org/r/31512


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad09e909
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad09e909
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad09e909

Branch: refs/heads/master
Commit: ad09e9095bcd4015b7ded07cde4e0e6ac8e1948e
Parents: 1e8f55d
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Feb 25 18:26:19 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Feb 27 16:28:51 2015 -0800

----------------------------------------------------------------------
 src/local/local.cpp   | 41 +++++++++++++++++++++++++++++++++++
 src/master/main.cpp   | 43 +++++++++++++++++++++++++++++++++++++
 src/master/master.cpp | 37 ++++++--------------------------
 src/master/master.hpp |  3 +++
 src/tests/cluster.hpp | 53 +++++++++++++++++++++++++++++++++++++++++++---
 5 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 8189edb..1908336 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -19,15 +19,19 @@
 #include <map>
 #include <set>
 #include <sstream>
+#include <string>
 #include <vector>
 
 #include <mesos/module/anonymous.hpp>
 
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/exit.hpp>
 #include <stout/foreach.hpp>
+#include <stout/memory.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/try.hpp>
@@ -66,6 +70,8 @@
 #include "state/protobuf.hpp"
 #include "state/storage.hpp"
 
+using memory::shared_ptr;
+
 using namespace mesos::internal;
 using namespace mesos::internal::log;
 
@@ -87,6 +93,7 @@ using mesos::modules::ModuleManager;
 
 using process::Owned;
 using process::PID;
+using process::RateLimiter;
 using process::UPID;
 
 using std::map;
@@ -200,6 +207,39 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
       authorizer = authorizer__.release();
     }
 
+    Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+    if (flags.slave_removal_rate_limit.isSome()) {
+      // Parse the flag value.
+      // TODO(vinod): Move this parsing logic to flags once we have a
+      // 'Rate' abstraction in stout.
+      vector<string> tokens =
+        strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+      if (tokens.size() != 2) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>";
+      }
+
+      Try<int> permits = numify<int>(tokens[0]);
+      if (permits.isError()) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>"
+                << ": " << permits.error();
+      }
+
+      Try<Duration> duration = Duration::parse(tokens[1]);
+      if (duration.isError()) {
+        EXIT(1) << "Invalid slave_removal_rate_limit: "
+                << flags.slave_removal_rate_limit.get()
+                << ". Format is <Number of slaves>/<Duration>"
+                << ": " << duration.error();
+      }
+
+      slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+    }
+
     // Create anonymous modules.
     foreach (const string& name, ModuleManager::find<Anonymous>()) {
       Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -224,6 +264,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
         contender,
         detector,
         authorizer,
+        slaveRemovalLimiter,
         flags);
 
     detector->appoint(master->info());

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index f202019..7cce3a0 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -19,17 +19,22 @@
 #include <stdint.h>
 
 #include <set>
+#include <string>
+#include <vector>
 
 #include <mesos/mesos.hpp>
 
 #include <mesos/module/anonymous.hpp>
 
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
 #include <stout/exit.hpp>
 #include <stout/flags.hpp>
+#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -72,12 +77,15 @@ using namespace mesos::internal::log;
 using namespace mesos::internal::master;
 using namespace zookeeper;
 
+using memory::shared_ptr;
+
 using mesos::MasterInfo;
 
 using mesos::modules::Anonymous;
 using mesos::modules::ModuleManager;
 
 using process::Owned;
+using process::RateLimiter;
 using process::UPID;
 
 using std::cerr;
@@ -85,6 +93,7 @@ using std::cout;
 using std::endl;
 using std::set;
 using std::string;
+using std::vector;
 
 
 void usage(const char* argv0, const flags::FlagsBase& flags)
@@ -287,6 +296,39 @@ int main(int argc, char** argv)
     authorizer = authorizer__.release();
   }
 
+  Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
+  if (flags.slave_removal_rate_limit.isSome()) {
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    vector<string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
+  }
+
   // Create anonymous modules.
   foreach (const string& name, ModuleManager::find<Anonymous>()) {
     Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
@@ -314,6 +356,7 @@ int main(int argc, char** argv)
       contender,
       detector,
       authorizer,
+      slaveRemovalLimiter,
       flags);
 
   if (zk.isNone()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 76e217d..4a1b428 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -273,6 +273,7 @@ Master::Master(
     MasterContender* _contender,
     MasterDetector* _detector,
     const Option<Authorizer*>& _authorizer,
+    const Option<shared_ptr<RateLimiter>>& _slaveRemovalLimiter,
     const Flags& _flags)
   : ProcessBase("master"),
     http(this),
@@ -287,6 +288,8 @@ Master::Master(
     metrics(new Metrics(*this)),
     electedTime(None())
 {
+  slaves.limiter = _slaveRemovalLimiter;
+
   // NOTE: We populate 'info_' here instead of inside 'initialize()'
   // because 'StandaloneMasterDetector' needs access to the info.
 
@@ -489,39 +492,11 @@ void Master::initialize()
     LOG(INFO) << "Framework rate limiting enabled";
   }
 
-  if (flags.slave_removal_rate_limit.isSome()) {
+  // If the rate limiter is injected for testing,
+  // the flag may not be set.
+  if (slaves.limiter.isSome() && flags.slave_removal_rate_limit.isSome()) {
     LOG(INFO) << "Slave removal is rate limited to "
               << flags.slave_removal_rate_limit.get();
-
-    // Parse the flag value.
-    // TODO(vinod): Move this parsing logic to flags once we have a
-    // 'Rate' abstraction in stout.
-    vector<string> tokens =
-      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
-
-    if (tokens.size() != 2) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>";
-    }
-
-    Try<int> permits = numify<int>(tokens[0]);
-    if (permits.isError()) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>"
-              << ": " << permits.error();
-    }
-
-    Try<Duration> duration = Duration::parse(tokens[1]);
-    if (duration.isError()) {
-      EXIT(1) << "Invalid slave_removal_rate_limit: "
-              << flags.slave_removal_rate_limit.get()
-              << ". Format is <Number of slaves>/<Duration>"
-              << ": " << duration.error();
-    }
-
-    slaves.limiter = new RateLimiter(permits.get(), duration.get());
   }
 
   hashmap<string, RoleInfo> roleInfos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e288cdb..3e8a8dc 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -34,6 +34,7 @@
 
 #include <mesos/module/authenticator.hpp>
 
+#include <process/limiter.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -104,6 +105,8 @@ public:
          MasterContender* contender,
          MasterDetector* detector,
          const Option<Authorizer*>& authorizer,
+         const Option<memory::shared_ptr<process::RateLimiter>>&
+           slaveRemovalLimiter,
          const Flags& flags = Flags());
 
   virtual ~Master();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad09e909/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index da242d9..a56b654 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -20,6 +20,8 @@
 #define __TESTS_CLUSTER_HPP__
 
 #include <map>
+#include <string>
+#include <vector>
 
 #include <mesos/mesos.hpp>
 
@@ -27,13 +29,16 @@
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/foreach.hpp>
 #include <stout/none.hpp>
+#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/path.hpp>
@@ -101,7 +106,9 @@ public:
     Try<process::PID<master::Master> > start(
         const master::Flags& flags = master::Flags(),
         const Option<master::allocator::Allocator*>& allocator = None(),
-        const Option<Authorizer*>& authorizer = None());
+        const Option<Authorizer*>& authorizer = None(),
+        const Option<memory::shared_ptr<process::RateLimiter> >&
+          slaveRemovalLimiter = None());
 
     // Stops and cleans up a master at the specified PID.
     Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -137,6 +144,8 @@ public:
 
       process::Owned<Authorizer> authorizer;
 
+      Option<memory::shared_ptr<process::RateLimiter>> slaveRemovalLimiter;
+
       master::Master* master;
     };
 
@@ -246,7 +255,8 @@ inline void Cluster::Masters::shutdown()
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
     const master::Flags& flags,
     const Option<master::allocator::Allocator*>& allocator,
-    const Option<Authorizer*>& authorizer)
+    const Option<Authorizer*>& authorizer,
+    const Option<memory::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
 {
   // Disallow multiple masters when not using ZooKeeper.
   if (!masters.empty() && url.isNone()) {
@@ -333,6 +343,40 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
     master.authorizer = authorizer__;
   }
 
+  if (slaveRemovalLimiter.isNone() &&
+      flags.slave_removal_rate_limit.isSome()) {
+    // Parse the flag value.
+    // TODO(vinod): Move this parsing logic to flags once we have a
+    // 'Rate' abstraction in stout.
+    std::vector<std::string> tokens =
+      strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
+
+    if (tokens.size() != 2) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>";
+    }
+
+    Try<int> permits = numify<int>(tokens[0]);
+    if (permits.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << permits.error();
+    }
+
+    Try<Duration> duration = Duration::parse(tokens[1]);
+    if (duration.isError()) {
+      EXIT(1) << "Invalid slave_removal_rate_limit: "
+              << flags.slave_removal_rate_limit.get()
+              << ". Format is <Number of slaves>/<Duration>"
+              << ": " << duration.error();
+    }
+
+    master.slaveRemovalLimiter = memory::shared_ptr<process::RateLimiter>(
+        new process::RateLimiter(permits.get(), duration.get()));
+  }
+
   master.master = new master::Master(
       master.allocator,
       master.registrar.get(),
@@ -340,7 +384,10 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
       &cluster->files,
       master.contender.get(),
       master.detector.get(),
-      authorizer.isSome() ? authorizer : master.authorizer.get(),
+      authorizer.isSome()
+          ? authorizer : master.authorizer.get(),
+      slaveRemovalLimiter.isSome()
+          ? slaveRemovalLimiter : master.slaveRemovalLimiter,
       flags);
 
   if (url.isNone()) {