You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/23 20:25:44 UTC

[1/6] git commit: Added support for optionally throttling the frameworks not specified in RateLimits config.

Repository: mesos
Updated Branches:
  refs/heads/master bdca37fe6 -> f943e2fc8


Added support for optionally throttling the frameworks not specified in RateLimits config.

Review: https://reviews.apache.org/r/22777


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f943e2fc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f943e2fc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f943e2fc

Branch: refs/heads/master
Commit: f943e2fc8ecd06b2c3b807e7ad6781cec21617c3
Parents: 1cd0a07
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 00:29:10 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto         | 13 ++---
 src/master/flags.hpp              |  3 +-
 src/master/master.cpp             | 96 ++++++++++++++++++++++++++--------
 src/master/master.hpp             |  9 +++-
 src/tests/rate_limiting_tests.cpp |  6 +--
 5 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 2f6be05..6968411 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -765,14 +765,15 @@ message RateLimit {
 
 /**
  * Collection of RateLimit.
- * Frameworks without rate limits defined here are not throttled.
- * TODO(xujyan): Currently when a framework is not specified in 'limits' it is
- * not throttled. This can be done more explicitly by adding a RateLimit entry
- * without setting its 'qps'. We should consider adding an optional
- * 'aggregate_default_qps' which can be used to throttle the frameworks not
- * specified here.
+ * Frameworks without rate limits defined here are not throttled
+ * unless 'aggregate_default_qps' is specified.
  */
 message RateLimits {
   // Items should have unique principals.
   repeated RateLimit limits = 1;
+
+  // All the frameworks not specified in 'limits' get this default
+  // rate. This rate is an aggregate rate for all of them, i.e.,
+  // their combined traffic is throttled together at this rate.
+  optional double aggregate_default_qps = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 70751d2..cd2a70e 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -258,7 +258,8 @@ public:
         "    {\n"
         "      \"principal\": \"bar\"\n"
         "    }\n"
-        "  ]\n"
+        "  ],\n"
+        "  \"aggregate_default_qps\": 33.3\n"
         "}");
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f376e60..bde0e57 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -374,6 +374,18 @@ void Master::initialize()
             : Option<Owned<RateLimiter> >::none());
     }
 
+    if (flags.rate_limits.get().has_aggregate_default_qps() &&
+        flags.rate_limits.get().aggregate_default_qps() <= 0) {
+      EXIT(1) << "Invalid aggregate_default_qps: "
+              << flags.rate_limits.get().aggregate_default_qps()
+              << ". It must be a positive number";
+    }
+
+    if (flags.rate_limits.get().has_aggregate_default_qps()) {
+      defaultLimiter = Owned<RateLimiter>(
+          new RateLimiter(flags.rate_limits.get().aggregate_default_qps()));
+    }
+
     LOG(INFO) << "Framework rate limiting enabled";
   }
 
@@ -779,13 +791,30 @@ void Master::exited(const UPID& pid)
 
 void Master::visit(const MessageEvent& event)
 {
+  // There are three cases about the message's UPID with respect to
+  // 'frameworks.principals':
+  // 1) if a <UPID, principal> pair exists and the principal is Some,
+  //    it's a framework with its principal specified.
+  // 2) if a <UPID, principal> pair exists and the principal is None,
+  //    it's a framework without a principal.
+  // 3) if a <UPID, principal> pair does not exist in the map, it's
+  //    either an unregistered framework or not a framework.
+  // The logic for framework message counters and rate limiting
+  // mainly concerns with whether the UPID is a *registered*
+  // framework and whether the framework has a principal so we use
+  // these two temp variables to simplify the condition checks below.
+  bool isRegisteredFramework =
+    frameworks.principals.contains(event.message->from);
+  const Option<string> principal = isRegisteredFramework
+    ? frameworks.principals[event.message->from]
+    : Option<string>::none();
+
   // Increment the "message_received" counter if the message is from
   // a framework and such a counter is configured for it.
   // See comments for 'Master::Metrics::Frameworks' and
   // 'Master::Frameworks::principals' for details.
-  const Option<string> principal =
-    frameworks.principals.get(event.message->from);
   if (principal.isSome()) {
+    // If the framework has a principal, the counter must exist.
     CHECK(metrics.frameworks.contains(principal.get()));
     Counter messages_received =
       metrics.frameworks.get(principal.get()).get()->messages_received;
@@ -814,20 +843,29 @@ void Master::visit(const MessageEvent& event)
     return;
   }
 
+  // Necessary to disambiguate below.
+  typedef void(Self::*F)(const MessageEvent&);
+
   // Throttle the message if it's a framework message and a
   // RateLimiter is configured for the framework's principal.
+  // The framework is throttled by the default RateLimiter if:
+  // 1) the default RateLimiter is configured (and)
+  // 2) the framework doesn't have a principal or its principal is
+  //    not specified in 'flags.rate_limits'.
   // The framework is not throttled if:
-  // 1) the principal is not specified by the framework (or)
-  // 2) the principal doesn't exist in RateLimits configuration (or)
-  // 3) the principal exists in RateLimits but 'qps' is not set.
+  // 1) the default RateLimiter is not configured to handle case 2)
+  //    above. (or)
+  // 2) the principal exists in RateLimits but 'qps' is not set.
   if (principal.isSome() &&
       limiters.contains(principal.get()) &&
       limiters[principal.get()].isSome()) {
-    // Necessary to disambiguate.
-    typedef void(Self::*F)(const MessageEvent&);
-
     limiters[principal.get()].get()->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+             isRegisteredFramework &&
+             defaultLimiter.isSome()) {
+    defaultLimiter.get()->acquire()
+      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else {
     _visit(event);
   }
@@ -836,22 +874,30 @@ void Master::visit(const MessageEvent& event)
 
 void Master::visit(const process::ExitedEvent& event)
 {
-  // Throttle the message if it's a framework message and a
-  // RateLimiter is configured for the framework's principal.
+  // See comments in 'visit(const MessageEvent& event)' for which
+  // RateLimiter is used to throttle this UPID and when it is not
+  // throttled.
   // Note that throttling ExitedEvent is necessary so the order
   // between MessageEvents and ExitedEvents from the same PID is
   // maintained.
-  // See comments in 'visit(const MessageEvent& event)' for when a
-  // message is not throttled.
-  const Option<string> principal = frameworks.principals.get(event.pid);
+  bool isRegisteredFramework = frameworks.principals.contains(event.pid);
+  const Option<string> principal = isRegisteredFramework
+    ? frameworks.principals[event.pid]
+    : Option<string>::none();
+
+  // Necessary to disambiguate below.
+  typedef void(Self::*F)(const ExitedEvent&);
+
   if (principal.isSome() &&
       limiters.contains(principal.get()) &&
       limiters[principal.get()].isSome()) {
-    // Necessary to disambiguate.
-    typedef void(Self::*F)(const ExitedEvent&);
-
     limiters[principal.get()].get()->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+             isRegisteredFramework &&
+             defaultLimiter.isSome()) {
+    defaultLimiter.get()->acquire()
+      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else {
     _visit(event);
   }
@@ -864,7 +910,9 @@ void Master::_visit(const MessageEvent& event)
   // mapping may be deleted in handling 'UnregisterFrameworkMessage'
   // but its counter still needs to be incremented for this message.
   const Option<string> principal =
-    frameworks.principals.get(event.message->from);
+    frameworks.principals.contains(event.message->from)
+      ? frameworks.principals[event.message->from]
+      : Option<string>::none();
 
   ProtobufProcess<Master>::visit(event);
 
@@ -3688,11 +3736,11 @@ void Master::addFramework(Framework* framework)
     principal = framework->info.principal();
   }
 
+  CHECK(!frameworks.principals.contains(framework->pid));
+  frameworks.principals.put(framework->pid, principal);
+
   // Export framework metrics if a principal is specified.
   if (principal.isSome()) {
-    CHECK(!frameworks.principals.contains(framework->pid));
-    frameworks.principals.put(framework->pid, principal.get());
-
     // Create new framework metrics if this framework is the first
     // one of this principal. Otherwise existing metrics are reused.
     if (!metrics.frameworks.contains(principal.get())) {
@@ -3843,11 +3891,13 @@ void Master::removeFramework(Framework* framework)
   // Remove the framework from authenticated.
   authenticated.erase(framework->pid);
 
+  CHECK(frameworks.principals.contains(framework->pid));
+  const Option<string> principal = frameworks.principals[framework->pid];
+
+  frameworks.principals.erase(framework->pid);
+
   // Remove the framework's message counters.
-  const Option<string> principal = frameworks.principals.get(framework->pid);
   if (principal.isSome()) {
-    frameworks.principals.erase(framework->pid);
-
     // Remove the metrics for the principal if this framework is the
     // last one with this principal.
     if (!frameworks.principals.containsValue(principal.get())) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f4cdb26..5fef354 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -495,13 +495,14 @@ private:
 
     // Principals of frameworks keyed by PID.
     // NOTE: Multiple PIDs can map to the same principal. The
-    // differences between this map and 'authenticated' are:
+    // principal is None when the framework doesn't specify it.
+    // The differences between this map and 'authenticated' are:
     // 1) This map only includes *registered* frameworks. The mapping
     //    is added when a framework (re-)registers.
     // 2) This map includes unauthenticated frameworks (when Master
     //    allows them) if they have principals specified in
     //    FrameworkInfo.
-    hashmap<process::UPID, std::string> principals;
+    hashmap<process::UPID, Option<std::string> > principals;
   } frameworks;
 
   hashmap<OfferID, Offer*> offers;
@@ -724,6 +725,10 @@ private:
   // Like Metrics::Frameworks, all frameworks of the same principal
   // are throttled together at a common rate limit.
   hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
+
+  // The default limiter is for frameworks not specified in
+  // 'flags.rate_limits'.
+  Option<process::Owned<process::RateLimiter> > defaultLimiter;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9cb3717..fc23a19 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -356,13 +356,13 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
   master::Flags flags = CreateMasterFlags();
 
   // Configure RateLimits to be 1qps and 0.5qps for two frameworks.
+  // Rate for the second framework is implicitly specified via
+  // 'aggregate_default_qps'.
   RateLimits limits;
   RateLimit* limit1 = limits.mutable_limits()->Add();
   limit1->set_principal("framework1");
   limit1->set_qps(1);
-  RateLimit* limit2 = limits.mutable_limits()->Add();
-  limit2->set_principal("framework2");
-  limit2->set_qps(0.5);
+  limits.set_aggregate_default_qps(0.5);
   flags.rate_limits = limits;
 
   flags.authenticate_frameworks = false;


[3/6] git commit: Added a log line to indicate framework rate limiting is enabled.

Posted by ya...@apache.org.
Added a log line to indicate framework rate limiting is enabled.

Review: https://reviews.apache.org/r/22745


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd8343a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd8343a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd8343a6

Branch: refs/heads/master
Commit: bd8343a634152ca1cb3c6043c0a4ab4f41619b4e
Parents: bdca37f
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Jun 18 10:57:36 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bd8343a6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 00152f5..72470da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -381,6 +381,8 @@ void Master::initialize()
                 Owned<RateLimiter>(new RateLimiter(limit_.qps())))
             : Option<Owned<RateLimiter> >::none());
     }
+
+    LOG(INFO) << "Framework rate limiting enabled";
   }
 
   hashmap<string, RoleInfo> roleInfos;


[2/6] git commit: Added more tests for framework rate limiting.

Posted by ya...@apache.org.
Added more tests for framework rate limiting.

Review: https://reviews.apache.org/r/22740


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac44eb59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac44eb59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac44eb59

Branch: refs/heads/master
Commit: ac44eb59beef16cf7383bc79834d7a4b62fe5567
Parents: a6236a4
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 14:56:37 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 src/tests/rate_limiting_tests.cpp | 805 +++++++++++++++++++++++++--------
 1 file changed, 604 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac44eb59/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 2a37fc1..9775b77 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -51,6 +51,10 @@ using testing::_;
 using testing::Eq;
 using testing::Return;
 
+namespace mesos {
+namespace internal {
+namespace master {
+
 // Query Mesos metrics snapshot endpoint and return a JSON::Object
 // result.
 #define METRICS_SNAPSHOT                                                       \
@@ -70,27 +74,68 @@ using testing::Return;
 
 // This test case covers tests related to framework API rate limiting
 // which includes metrics exporting for API call rates.
-class RateLimitingTest : public MesosTest {};
+class RateLimitingTest : public MesosTest
+{
+public:
+  virtual master::Flags CreateMasterFlags()
+  {
+    master::Flags flags = MesosTest::CreateMasterFlags();
+
+    RateLimits limits;
+    RateLimit* limit = limits.mutable_limits()->Add();
+    limit->set_principal(DEFAULT_CREDENTIAL.principal());
+    // Set 1qps so that the half-second Clock::advance()s for
+    // metrics endpoint (because it also throttles requests but at
+    // 2qps) don't mess with framework rate limiting.
+    limit->set_qps(1);
+    flags.rate_limits = JSON::Protobuf(limits);
+
+    return flags;
+  }
+};
 
 
 // Verify that message counters for a framework are added when a
-// framework registers and removed when it terminates.
-TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+// framework registers, removed when it terminates and count messages
+// correctly when it is given unlimited rate.
+TEST_F(RateLimitingTest, NoRateLimiting)
 {
-  Try<PID<Master> > master = StartMaster();
+  // Give the framework unlimited rate explicitly by specifying a
+  // RateLimit entry without 'qps'
+  master::Flags flags = CreateMasterFlags();
+  RateLimits limits;
+  RateLimit* limit = limits.mutable_limits()->Add();
+  limit->set_principal(DEFAULT_CREDENTIAL.principal());
+  flags.rate_limits = JSON::Protobuf(limits);
+
+  Try<PID<Master> > master = StartMaster(flags);
   ASSERT_SOME(master);
 
+  Clock::pause();
+
+  // Settle to make sure master is ready for incoming requests, i.e.,
+  // '_recover()' completes.
+  Clock::settle();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
   // Message counters not present before the framework registers.
   {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
-        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_received"));
+        0u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_received"));
 
     EXPECT_EQ(
-        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_processed"));
+        0u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_processed"));
   }
 
   MockScheduler sched;
@@ -99,25 +144,49 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
   MesosSchedulerDriver* driver = new MesosSchedulerDriver(
       &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  Future<Nothing> registered;
   EXPECT_CALL(sched, registered(driver, _, _))
-    .WillOnce(FutureSatisfy(&registered));
+    .Times(1);
+
+  // Grab the stuff we need to replay the RegisterFrameworkMessage.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
   ASSERT_EQ(DRIVER_RUNNING, driver->start());
 
-  AWAIT_READY(registered);
+  AWAIT_READY(registerFrameworkMessage);
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+  // For metrics endpoint.
+  Clock::advance(Milliseconds(501));
 
-  // Message counters added after the framework is registered.
+  // Send a duplicate RegisterFrameworkMessage. Master sends
+  // FrameworkRegisteredMessage back after processing it.
   {
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Verify that one message is received and processed (after
+    // registration).
     JSON::Object metrics = METRICS_SNAPSHOT;
 
-    EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_received"));
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
 
-    EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_processed"));
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
   }
 
   Future<Nothing> frameworkRemoved =
@@ -127,36 +196,195 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
   driver->join();
   delete driver;
 
+  // The fact that UnregisterFrameworkMessage (the 2nd message from
+  // 'sched' that reaches Master after its registration) gets
+  // processed without Clock advances proves that the framework is
+  // given unlimited rate.
   AWAIT_READY(frameworkRemoved);
 
-  // Message counter removed after the framework is unregistered.
+  // For metrics endpoint.
+  Clock::advance(Milliseconds(501));
+
+  // Message counters removed after the framework is unregistered.
   {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
-        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_received"));
+        0u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_received"));
 
     EXPECT_EQ(
-        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_processed"));
+        0u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_processed"));
   }
 
   Shutdown();
 }
 
 
-// Verify that framework message counters work with frameworks of
-// different principals.
-TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+// Verify that a framework is being correctly throttled at the
+// configured rate.
+TEST_F(RateLimitingTest, RateLimitingEnabled)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Clock::pause();
+
+  // Settle to make sure master is ready for incoming requests, i.e.,
+  // '_recover()' completes.
+  Clock::settle();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  // Grab the stuff we need to replay the RegisterFrameworkMessage.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+  ASSERT_EQ(DRIVER_RUNNING, driver.start());
+
+  AWAIT_READY(registerFrameworkMessage);
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+  // Keep sending duplicate RegisterFrameworkMessages. Master sends
+  // FrameworkRegisteredMessage back after processing each of them.
+  {
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+    // The first message is not throttled because it's at the head of
+    // the queue.
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Verify that one message is received and processed (after
+    // registration).
+    JSON::Object metrics = METRICS_SNAPSHOT;
+
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  // The 2nd message is throttled for a second.
+  Future<process::Message> duplicateFrameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                   master.get(),
+                   _);
+
+  process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+  // Advance for half a second and verify that the message is still
+  // not processed.
+  Clock::advance(Milliseconds(501));
+
+  // Settle to make sure all events not delayed are processed.
+  Clock::settle();
+
+  {
+    JSON::Object metrics = METRICS_SNAPSHOT;
+
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+    // The 2nd message is received and but not processed after half
+    // a second because of throttling.
+    EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+    EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+  }
+
+  // After another half a second the message should be processed.
+  Clock::advance(Milliseconds(501));
+  AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+  // Verify counters after processing of the message.
+  JSON::Object metrics = METRICS_SNAPSHOT;
+
+  const string& messages_received =
+    "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+  EXPECT_EQ(1u, metrics.values.count(messages_received));
+  const string& messages_processed =
+    "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+  EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+  EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+  EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+
+  EXPECT_EQ(DRIVER_STOPPED, driver.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver.join());
+
+  Shutdown();
+}
+
+
+// Verify that framework message counters and rate limiters work with
+// frameworks of different principals which are throttled at
+// different rates.
+TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
 {
-  // 1. Register two frameworks.
   master::Flags flags = CreateMasterFlags();
+
+  // Configure RateLimits to be 1qps and 0.5qps for two frameworks.
+  RateLimits limits;
+  RateLimit* limit1 = limits.mutable_limits()->Add();
+  limit1->set_principal("framework1");
+  limit1->set_qps(1);
+  RateLimit* limit2 = limits.mutable_limits()->Add();
+  limit2->set_principal("framework2");
+  limit2->set_qps(0.5);
+  flags.rate_limits = JSON::Protobuf(limits);
+
   flags.authenticate_frameworks = false;
 
   Try<PID<Master> > master = StartMaster(flags);
   ASSERT_SOME(master);
 
+  Clock::pause();
+
+  // Settle to make sure master is ready for incoming requests, i.e.,
+  // '_recover()' completes.
+  Clock::settle();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
+  // 1. Register two frameworks.
+
+  // 1.1. Create the first framework.
   FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
   frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
   frameworkInfo1.set_principal("framework1");
@@ -167,16 +395,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
   MesosSchedulerDriver* driver1 =
     new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
 
-  {
-    Future<Nothing> registered;
-    EXPECT_CALL(sched1, registered(driver1, _, _))
-      .WillOnce(FutureSatisfy(&registered));
+  EXPECT_CALL(sched1, registered(driver1, _, _))
+    .Times(1);
 
-    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+  // Grab the stuff we need to replay the RegisterFrameworkMessage
+  // for sched1.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
-    AWAIT_READY(registered);
-  }
+  ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+  AWAIT_READY(registerFrameworkMessage1);
+  AWAIT_READY(frameworkRegisteredMessage1);
 
+  const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+  // 1.2. Create the second framework.
   FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
   frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
   frameworkInfo2.set_principal("framework2");
@@ -184,17 +420,123 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
 
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
+
+  // Grab the stuff we need to replay the RegisterFrameworkMessage
+  // for sched2.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+  ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+  AWAIT_READY(registerFrameworkMessage2);
+  AWAIT_READY(frameworkRegisteredMessage2);
+
+  const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
+
+  // 2. Send duplicate RegisterFrameworkMessages from the two
+  // schedulers to Master.
+
+  // The first messages are not throttled because they are at the
+  // head of the queue.
   {
-    Future<Nothing> registered;
-    EXPECT_CALL(sched2, registered(&driver2, _, _))
-      .WillOnce(FutureSatisfy(&registered));
+    Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     sched1Pid);
+    Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     sched2Pid);
+
+    process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+    process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
 
-    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+    AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+    AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+  }
 
-    AWAIT_READY(registered);
+  // Send the second batch of messages which should be throttled.
+  {
+    Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     sched1Pid);
+    Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     sched2Pid);
+
+    process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+    process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+    // Settle to make sure the pending futures below are indeed due
+    // to throttling.
+    Clock::settle();
+
+    EXPECT_TRUE(duplicateFrameworkRegisteredMessage1.isPending());
+    EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+    {
+      // Verify counters also indicate that messages are received but
+      // not processed.
+      JSON::Object metrics = METRICS_SNAPSHOT;
+
+      EXPECT_EQ(
+          1u, metrics.values.count("frameworks/framework1/messages_received"));
+      EXPECT_EQ(
+          1u, metrics.values.count("frameworks/framework1/messages_processed"));
+      EXPECT_EQ(
+          1u, metrics.values.count("frameworks/framework2/messages_received"));
+      EXPECT_EQ(
+          1u, metrics.values.count("frameworks/framework2/messages_processed"));
+
+      EXPECT_EQ(
+          2,
+          metrics.values["frameworks/framework1/messages_received"]
+            .as<JSON::Number>().value);
+      EXPECT_EQ(
+          2,
+          metrics.values["frameworks/framework2/messages_received"]
+            .as<JSON::Number>().value);
+      EXPECT_EQ(
+          1,
+          metrics.values["frameworks/framework1/messages_processed"]
+            .as<JSON::Number>().value);
+      EXPECT_EQ(
+          1,
+          metrics.values["frameworks/framework2/messages_processed"]
+            .as<JSON::Number>().value);
+    }
+
+    // Advance for a second so the message from framework1 (1qps)
+    // should be processed.
+    Clock::advance(Seconds(1));
+    AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+    EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+    // Framework1's message is processed and framework2's is not
+    // because it's throttled at a lower rate.
+    JSON::Object metrics = METRICS_SNAPSHOT;
+    EXPECT_EQ(
+        2,
+        metrics.values["frameworks/framework1/messages_processed"]
+          .as<JSON::Number>().value);
+    EXPECT_EQ(
+        1,
+        metrics.values["frameworks/framework2/messages_processed"]
+          .as<JSON::Number>().value);
+
+    // After another half a second framework2 (0.2qps)'s message is
+    // processed as well.
+    Clock::advance(Seconds(1));
+    AWAIT_READY(duplicateFrameworkRegisteredMessage2);
   }
 
-  // 2. Verify that both frameworks have message counters added.
+  // 2. Counters confirm that both frameworks' messages are processed.
   {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
@@ -206,9 +548,26 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
         1u, metrics.values.count("frameworks/framework2/messages_received"));
     EXPECT_EQ(
         1u, metrics.values.count("frameworks/framework2/messages_processed"));
+    EXPECT_EQ(
+        2,
+        metrics.values["frameworks/framework1/messages_received"]
+          .as<JSON::Number>().value);
+    EXPECT_EQ(
+        2,
+        metrics.values["frameworks/framework2/messages_received"]
+          .as<JSON::Number>().value);
+    EXPECT_EQ(
+        2,
+        metrics.values["frameworks/framework1/messages_processed"]
+          .as<JSON::Number>().value);
+    EXPECT_EQ(
+        2,
+        metrics.values["frameworks/framework2/messages_processed"]
+          .as<JSON::Number>().value);
   }
 
-  // 3. Remove a framework.
+  // 3. Remove a framework and its message counters are deleted while
+  // the other framework's counters stay.
   Future<Nothing> frameworkRemoved =
     FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
 
@@ -216,22 +575,28 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
   driver1->join();
   delete driver1;
 
+  // No need to advance again because we already advanced 1sec for
+  // sched2 so the RateLimiter for sched1 doesn't impose a delay this
+  // time.
   AWAIT_READY(frameworkRemoved);
 
-  // 4. Its message counters are deleted while the other framework's
-  //    counters stay.
-  {
-    JSON::Object metrics = METRICS_SNAPSHOT;
+  // Settle to avoid the race between the removal of the counters and
+  // the metrics endpoint query.
+  Clock::settle();
 
-    EXPECT_EQ(
-        0u, metrics.values.count("frameworks/framework1/messages_received"));
-    EXPECT_EQ(
-        0u, metrics.values.count("frameworks/framework1/messages_processed"));
-    EXPECT_EQ(
-        1u, metrics.values.count("frameworks/framework2/messages_received"));
-    EXPECT_EQ(
-        1u, metrics.values.count("frameworks/framework2/messages_processed"));
-  }
+  // Advance for Metrics rate limiting.
+  Clock::advance(Milliseconds(501));
+
+  JSON::Object metrics = METRICS_SNAPSHOT;
+
+  EXPECT_EQ(
+      0u, metrics.values.count("frameworks/framework1/messages_received"));
+  EXPECT_EQ(
+      0u, metrics.values.count("frameworks/framework1/messages_processed"));
+  EXPECT_EQ(
+      1u, metrics.values.count("frameworks/framework2/messages_received"));
+  EXPECT_EQ(
+      1u, metrics.values.count("frameworks/framework2/messages_processed"));
 
   driver2.stop();
   driver2.join();
@@ -241,56 +606,135 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
 
 
 // Verify that if multiple frameworks use the same principal, they
-// share the same counters and removing one framework doesn't remove
-// the counters.
-TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+// share the same counters, are throtted at the same rate and
+// removing one framework doesn't remove the counters.
+TEST_F(RateLimitingTest, SamePrincipalFrameworks)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
+  Clock::pause();
+
+  // Settle to make sure master is ready for incoming requests, i.e.,
+  // '_recover()' completes.
+  Clock::settle();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
+  // 1. Register two frameworks.
+
+  // 1.1. Create the first framework.
   MockScheduler sched1;
   // Create MesosSchedulerDriver on the heap because of the need to
   // destroy it during the test due to MESOS-1456.
   MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
       &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  {
-    Future<Nothing> registered;
-    EXPECT_CALL(sched1, registered(driver1, _, _))
-      .WillOnce(FutureSatisfy(&registered));
+  EXPECT_CALL(sched1, registered(driver1, _, _))
+    .Times(1);
+
+  // Grab the stuff we need to replay the RegisterFrameworkMessage
+  // for sched1.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
-    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+  ASSERT_EQ(DRIVER_RUNNING, driver1->start());
 
-    AWAIT_READY(registered);
-  }
+  AWAIT_READY(registerFrameworkMessage1);
+  AWAIT_READY(frameworkRegisteredMessage1);
+
+  const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+  // 1.2. Create the second framework.
 
   // 'sched2' uses the same principal "test-principal".
   MockScheduler sched2;
   MesosSchedulerDriver driver2(
       &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  {
-    Future<Nothing> registered;
-    EXPECT_CALL(sched2, registered(&driver2, _, _))
-      .WillOnce(FutureSatisfy(&registered));
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
 
-    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+  // Grab the stuff we need to replay the RegisterFrameworkMessage
+  // for sched2.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
-    AWAIT_READY(registered);
-  }
+  ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+  AWAIT_READY(registerFrameworkMessage2);
+  AWAIT_READY(frameworkRegisteredMessage2);
+
+  const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
 
   // Message counters added after both frameworks are registered.
   {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_received"));
+        1u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_received"));
     EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_processed"));
+        1u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_processed"));
+  }
+
+  // The 1st message from sched1 is not throttled as it's at the head
+  // of the queue but the 1st message from sched2 is because it's
+  // throttled by the same RateLimiter.
+  Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                   master.get(),
+                   sched1Pid);
+  Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                   master.get(),
+                   sched2Pid);
+
+  process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+  process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+  AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+
+  // Settle to make sure the pending future is indeed caused by
+  // throttling.
+  Clock::settle();
+  EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+  // For metrics endpoint.
+  Clock::advance(Milliseconds(501));
+
+  {
+    JSON::Object metrics = METRICS_SNAPSHOT;
+
+    // Two messages received and one processed.
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
   }
 
+  // Advance for another half a second to make sure throttled
+  // message is processed.
+  Clock::advance(Milliseconds(501));
+
+  AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+
   Future<Nothing> frameworkRemoved =
     FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
 
@@ -298,19 +742,29 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
   driver1->join();
   delete driver1;
 
+  // Advance to let UnregisterFrameworkMessage come through.
+  Clock::settle();
+  Clock::advance(Seconds(1));
+
   AWAIT_READY(frameworkRemoved);
 
   // Message counters are not removed after the first framework is
   // unregistered.
+
+  // For metrics endpoint.
+  Clock::advance(Milliseconds(501));
+
   {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_received"));
+        1u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_received"));
     EXPECT_EQ(
-        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
-                                 "/messages_processed"));
+        1u,
+        metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+                             "/messages_processed"));
   }
 
   driver2.stop();
@@ -321,12 +775,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
 
 
 // Verify that when a scheduler fails over, the new scheduler
-// instance continues to use the same counters.
-TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+// instance continues to use the same counters and RateLimiter.
+TEST_F(RateLimitingTest, SchedulerFailover)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
+  Clock::pause();
+
+  // Settle to make sure master is ready for incoming requests, i.e.,
+  // '_recover()' completes.
+  Clock::settle();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
   // 1. Launch the first (i.e., failing) scheduler and verify its
   // counters.
 
@@ -339,11 +805,9 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
     .WillOnce(FutureArg<1>(&frameworkId));
 
   {
-    // Grab this message so we can resend it.
+    // Grab the stuff we need to replay the RegisterFrameworkMessage.
     Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
         RegisterFrameworkMessage(), _, master.get());
-
-    // Grab this message to get the scheduler's pid.
     Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
         Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
 
@@ -369,9 +833,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
     AWAIT_READY(duplicateFrameworkRegisteredMessage);
 
     // Settle to make sure message_processed counters are updated.
-    Clock::pause();
     Clock::settle();
-    Clock::resume();
 
     // Verify the message counters.
     JSON::Object metrics = METRICS_SNAPSHOT;
@@ -379,12 +841,12 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
     // One message received and processed after the framework is
     // registered.
     const string& messages_received =
-      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
     EXPECT_EQ(1u, metrics.values.count(messages_received));
     EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
 
     const string& messages_processed =
-      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
     EXPECT_EQ(1u, metrics.values.count(messages_processed));
     EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
   }
@@ -410,147 +872,88 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
   EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
     .WillOnce(FutureSatisfy(&sched1Error));
 
-  {
-    // Grab this message to get the scheduler's pid and to make sure we
-    // wait until the framework is registered.
-    Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
-        Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+  // Grab the stuff we need to replay the ReregisterFrameworkMessage
+  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+  Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+    FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
 
-    // Grab this message so we can resend it.
-    Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
-      FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+  driver2.start();
 
-    driver2.start();
+  AWAIT_READY(reregisterFrameworkMessage);
+  AWAIT_READY(sched1Error);
+  AWAIT_READY(frameworkRegisteredMessage);
 
-    AWAIT_READY(reregisterFrameworkMessage);
-    AWAIT_READY(sched1Error);
-    AWAIT_READY(frameworkRegisteredMessage);
+  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
 
-    const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+  Future<process::Message> duplicateFrameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                   master.get(),
+                   _);
 
-    Future<process::Message> duplicateFrameworkRegisteredMessage =
-      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
-                     master.get(),
-                     _);
+  // Sending a duplicate ReregisterFrameworkMessage to test the
+  // message counters with the new scheduler instance.
+  process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
 
-    // Sending a duplicate ReregisterFrameworkMessage to test the
-    // message counters with the new scheduler instance.
-    process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+  // Settle to make sure everything not delayed is processed.
+  Clock::settle();
 
-    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+  // Throttled because the same RateLimiter instance is
+  // throttling the new scheduler instance.
+  EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
 
-    // Settle to make sure message_processed counters are updated.
-    Clock::pause();
-    Clock::settle();
-    Clock::resume();
+  // Advance for metrics.
+  Clock::advance(Milliseconds(501));
 
+  {
     JSON::Object metrics = METRICS_SNAPSHOT;
 
-    // Another message after sched2 is reregistered plus the one from
-    // the sched1.
+    // Verify that counters correctly indicates the message is
+    // received but not processed.
     const string& messages_received =
-      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
     EXPECT_EQ(1u, metrics.values.count(messages_received));
     EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
 
     const string& messages_processed =
-      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
     EXPECT_EQ(1u, metrics.values.count(messages_processed));
-    EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
   }
 
-  EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
-  EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+  // Need another half a second to have it processed.
+  Clock::advance(Milliseconds(501));
 
-  EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
-  EXPECT_EQ(DRIVER_STOPPED, driver1.join());
-
-  Shutdown();
-}
-
-
-// Verify that a framework is being correctly throttled at the
-// configured rate.
-TEST_F(RateLimitingTest, ThrottleFramework)
-{
-  // Throttle at 1 QPS.
-  RateLimits limits;
-  RateLimit* limit = limits.mutable_limits()->Add();
-  limit->set_principal(DEFAULT_CREDENTIAL.principal());
-  limit->set_qps(1);
-  master::Flags flags = CreateMasterFlags();
+  AWAIT_READY(duplicateFrameworkRegisteredMessage);
 
-  flags.rate_limits = JSON::Protobuf(limits);
-
-  Try<PID<Master> > master = StartMaster(flags);
-  ASSERT_SOME(master);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  // Advance for metrics.
+  Clock::advance(Milliseconds(501));
 
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  // Grab this message so we can resend it.
-  Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
-      RegisterFrameworkMessage(), _, master.get());
-
-  // Grab this message to get the scheduler's pid and to make sure we
-  // wait until the framework is registered.
-  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
-      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
-
-  ASSERT_EQ(DRIVER_RUNNING, driver.start());
-
-  AWAIT_READY(registerFrameworkMessage);
-  AWAIT_READY(frameworkRegisteredMessage);
-
-  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
-
-  Clock::pause();
-
-  // Keep sending duplicate RegisterFrameworkMessages. Master sends
-  // FrameworkRegisteredMessage back after processing each of them.
   {
-    Future<process::Message> duplicateFrameworkRegisteredMessage =
-      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
-                     master.get(),
-                     _);
-
-    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
-    Clock::settle();
-
-    // The first message is not throttled because it's at the head of
-    // the queue.
-    AWAIT_READY(duplicateFrameworkRegisteredMessage);
-  }
-
-  {
-    Future<process::Message> duplicateFrameworkRegisteredMessage =
-      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
-                     master.get(),
-                     _);
-
-    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
-    Clock::settle();
-
-    // The 2nd message is throttled for a second.
-    Clock::advance(Milliseconds(500));
-    Clock::settle();
-
-    EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
-    Clock::advance(Milliseconds(501));
-    Clock::settle();
+    // Another message after sched2 is reregistered plus the one from
+    // the sched1.
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
 
-    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
   }
 
-  Clock::resume();
+  EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver2.join());
 
-  EXPECT_EQ(DRIVER_STOPPED, driver.stop());
-  EXPECT_EQ(DRIVER_STOPPED, driver.join());
+  EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver1.join());
 
   Shutdown();
 }
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {


[6/6] git commit: Refactored the querying and parsing of metrics snapshot into a MACRO.

Posted by ya...@apache.org.
Refactored the querying and parsing of metrics snapshot into a MACRO.

Review: https://reviews.apache.org/r/22639


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6236a43
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6236a43
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6236a43

Branch: refs/heads/master
Commit: a6236a43d19dfd95b665463215db50525375efbc
Parents: 1b2596d
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 13 19:57:08 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 src/tests/rate_limiting_tests.cpp | 136 +++++++--------------------------
 1 file changed, 26 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a6236a43/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9a54461..2a37fc1 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -51,6 +51,23 @@ using testing::_;
 using testing::Eq;
 using testing::Return;
 
+// Query Mesos metrics snapshot endpoint and return a JSON::Object
+// result.
+#define METRICS_SNAPSHOT                                                       \
+  ({ Future<process::http::Response> response =                                \
+       process::http::get(MetricsProcess::instance()->self(), "snapshot");     \
+     AWAIT_READY(response);                                                    \
+                                                                               \
+     EXPECT_SOME_EQ(                                                           \
+         "application/json",                                                   \
+         response.get().headers.get("Content-Type"));                          \
+                                                                               \
+     Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); \
+     ASSERT_SOME(parse);                                                       \
+                                                                               \
+     parse.get(); })
+
+
 // This test case covers tests related to framework API rate limiting
 // which includes metrics exporting for API call rates.
 class RateLimitingTest : public MesosTest {};
@@ -65,20 +82,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
 
   // Message counters not present before the framework registers.
   {
-    // TODO(xujyan): It would be nice to refactor out the common
-    // metrics snapshot querying logic into a helper method/MACRO.
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -105,18 +109,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
 
   // Message counters added after the framework is registered.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -138,18 +131,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
 
   // Message counter removed after the framework is unregistered.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -214,18 +196,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
 
   // 2. Verify that both frameworks have message counters added.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         1u, metrics.values.count("frameworks/framework1/messages_received"));
@@ -250,18 +221,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
   // 4. Its message counters are deleted while the other framework's
   //    counters stay.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         0u, metrics.values.count("frameworks/framework1/messages_received"));
@@ -321,18 +281,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
 
   // Message counters added after both frameworks are registered.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -354,18 +303,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
   // Message counters are not removed after the first framework is
   // unregistered.
   {
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     EXPECT_EQ(
         1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -436,18 +374,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
     Clock::resume();
 
     // Verify the message counters.
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     // One message received and processed after the framework is
     // registered.
@@ -517,18 +444,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
     Clock::settle();
     Clock::resume();
 
-    Future<process::http::Response> response =
-      process::http::get(MetricsProcess::instance()->self(), "snapshot");
-    AWAIT_READY(response);
-
-    EXPECT_SOME_EQ(
-        "application/json",
-        response.get().headers.get("Content-Type"));
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-    ASSERT_SOME(parse);
-
-    JSON::Object metrics = parse.get();
+    JSON::Object metrics = METRICS_SNAPSHOT;
 
     // Another message after sched2 is reregistered plus the one from
     // the sched1.


[5/6] git commit: Added flags::parse() overload for RateLimits protobuf.

Posted by ya...@apache.org.
Added flags::parse() overload for RateLimits protobuf.

Review: https://reviews.apache.org/r/22758


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1cd0a072
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1cd0a072
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1cd0a072

Branch: refs/heads/master
Commit: 1cd0a072d9dafe8d6835ec02afca13b0124cae8c
Parents: ac44eb5
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Jun 18 13:39:59 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 src/common/parse.hpp              | 14 ++++++++++++++
 src/common/type_utils.hpp         |  8 ++++++++
 src/master/flags.hpp              |  2 +-
 src/master/master.cpp             | 10 +---------
 src/tests/rate_limiting_tests.cpp |  6 +++---
 5 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/common/parse.hpp
----------------------------------------------------------------------
diff --git a/src/common/parse.hpp b/src/common/parse.hpp
index 5b12e38..4eb5096 100644
--- a/src/common/parse.hpp
+++ b/src/common/parse.hpp
@@ -38,6 +38,20 @@ inline Try<mesos::ACLs> parse(const std::string& value)
   return protobuf::parse<mesos::ACLs>(json.get());
 }
 
+
+template<>
+inline Try<mesos::RateLimits> parse(const std::string& value)
+{
+  // Convert from string or file to JSON.
+  Try<JSON::Object> json = parse<JSON::Object>(value);
+  if (json.isError()) {
+    return Error(json.error());
+  }
+
+  // Convert from JSON to Protobuf.
+  return protobuf::parse<mesos::RateLimits>(json.get());
+}
+
 } // namespace flags {
 
 #endif // __COMMON_PARSE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 27ea4d2..bb357ac 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -122,6 +122,14 @@ inline std::ostream& operator << (
 }
 
 
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const RateLimits& limits)
+{
+  return stream << limits.DebugString();
+}
+
+
 inline bool operator == (const FrameworkID& left, const FrameworkID& right)
 {
   return left.value() == right.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 47bb0dc..70751d2 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -287,7 +287,7 @@ public:
   bool authenticate_slaves;
   Option<std::string> credentials;
   Option<ACLs> acls;
-  Option<JSON::Object> rate_limits;
+  Option<RateLimits> rate_limits;
 };
 
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 72470da..f376e60 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -46,7 +46,6 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
-#include <stout/protobuf.hpp>
 #include <stout/stringify.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -355,15 +354,8 @@ void Master::initialize()
   }
 
   if (flags.rate_limits.isSome()) {
-    Try<RateLimits> limits =
-      ::protobuf::parse<RateLimits>(flags.rate_limits.get());
-    if (limits.isError()) {
-      EXIT(1) << "Invalid RateLimits format: " << limits.error()
-              << " (see --rate_limits flag)";
-    }
-
     // Add framework rate limiters.
-    foreach (const RateLimit& limit_, limits.get().limits()) {
+    foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
       if (limiters.contains(limit_.principal())) {
         EXIT(1) << "Duplicate principal " << limit_.principal()
                 << " found in RateLimits configuration";

http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9775b77..9cb3717 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -88,7 +88,7 @@ public:
     // metrics endpoint (because it also throttles requests but at
     // 2qps) don't mess with framework rate limiting.
     limit->set_qps(1);
-    flags.rate_limits = JSON::Protobuf(limits);
+    flags.rate_limits = limits;
 
     return flags;
   }
@@ -106,7 +106,7 @@ TEST_F(RateLimitingTest, NoRateLimiting)
   RateLimits limits;
   RateLimit* limit = limits.mutable_limits()->Add();
   limit->set_principal(DEFAULT_CREDENTIAL.principal());
-  flags.rate_limits = JSON::Protobuf(limits);
+  flags.rate_limits = limits;
 
   Try<PID<Master> > master = StartMaster(flags);
   ASSERT_SOME(master);
@@ -363,7 +363,7 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
   RateLimit* limit2 = limits.mutable_limits()->Add();
   limit2->set_principal("framework2");
   limit2->set_qps(0.5);
-  flags.rate_limits = JSON::Protobuf(limits);
+  flags.rate_limits = limits;
 
   flags.authenticate_frameworks = false;
 


[4/6] git commit: Removed the wait in StartMaster() for the master to get elected before returning because it's no longer necessary.

Posted by ya...@apache.org.
Removed the wait in StartMaster() for the master to get elected before returning because it's no longer necessary.

- Cluster::Masters::start now wait for the master to be recovered, which happens only after it's elected.

Review: https://reviews.apache.org/r/22856


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b2596d6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b2596d6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b2596d6

Branch: refs/heads/master
Commit: 1b2596d6174d284909bdef5548c524ba2fdae305
Parents: bd8343a
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 00:37:42 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700

----------------------------------------------------------------------
 src/tests/cluster.hpp |  2 ++
 src/tests/mesos.cpp   | 53 ++++++----------------------------------------
 src/tests/mesos.hpp   | 19 +++--------------
 3 files changed, 11 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 1c96ee7..a83a60e 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -423,6 +423,8 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
   // Speed up the tests by ensuring that the Master is recovered
   // before the test proceeds. Otherwise, authentication and
   // registration messages may be dropped, causing delayed retries.
+  // NOTE: The tests may still need to settle the Clock while it's
+  // paused to ensure that the Master finishes executing _recover().
   if (!_recover.await(Seconds(10))) {
     LOG(FATAL) << "Failed to wait for _recover";
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 1037420..24af32b 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -45,8 +45,6 @@ using std::string;
 
 using namespace process;
 
-using testing::_;
-
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -159,67 +157,28 @@ slave::Flags MesosTest::CreateSlaveFlags()
 
 
 Try<process::PID<master::Master> > MesosTest::StartMaster(
-    const Option<master::Flags>& flags,
-    bool wait)
+    const Option<master::Flags>& flags)
 {
-  Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
-  Try<process::PID<master::Master> > master = cluster.masters.start(
+  return cluster.masters.start(
       flags.isNone() ? CreateMasterFlags() : flags.get());
-
-  // Wait until the leader is detected because otherwise this master
-  // may reject authentication requests because it doesn't know it's
-  // the leader yet [MESOS-881].
-  if (wait && master.isSome() && !detected.await(Seconds(10))) {
-    return Error("Failed to wait " + stringify(Seconds(10)) +
-                 " for master to detect the leader");
-  }
-
-  return master;
 }
 
 
 Try<process::PID<master::Master> > MesosTest::StartMaster(
     master::allocator::AllocatorProcess* allocator,
-    const Option<master::Flags>& flags,
-    bool wait)
+    const Option<master::Flags>& flags)
 {
-  Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
-  Try<process::PID<master::Master> > master = cluster.masters.start(
+  return cluster.masters.start(
       allocator, flags.isNone() ? CreateMasterFlags() : flags.get());
-
-  // Wait until the leader is detected because otherwise this master
-  // may reject authentication requests because it doesn't know it's
-  // the leader yet [MESOS-881].
-  if (wait && master.isSome() && !detected.await(Seconds(10))) {
-    return Error("Failed to wait " + stringify(Seconds(10)) +
-                 " for master to detect the leader");
-  }
-
-  return master;
 }
 
 
 Try<process::PID<master::Master> > MesosTest::StartMaster(
     Authorizer* authorizer,
-    const Option<master::Flags>& flags,
-    bool wait)
+    const Option<master::Flags>& flags)
 {
-  Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
-  Try<process::PID<master::Master> > master = cluster.masters.start(
+  return cluster.masters.start(
       authorizer, flags.isNone() ? CreateMasterFlags() : flags.get());
-
-  // Wait until the leader is detected because otherwise this master
-  // may reject authentication requests because it doesn't know it's
-  // the leader yet [MESOS-881].
-  if (wait && master.isSome() && !detected.await(Seconds(10))) {
-    return Error("Failed to wait " + stringify(Seconds(10)) +
-                 " for master to detect the leader");
-  }
-
-  return master;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 0b9b2f9..c40c82d 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -83,33 +83,20 @@ protected:
   virtual slave::Flags CreateSlaveFlags();
 
   // Starts a master with the specified flags.
-  // Waits for the master to detect a leader (could be itself) before
-  // returning if 'wait' is set to true.
-  // TODO(xujyan): Return a future which becomes ready when the
-  // master detects a leader (when wait == true) and have the tests
-  // do AWAIT_READY.
   virtual Try<process::PID<master::Master> > StartMaster(
-      const Option<master::Flags>& flags = None(),
-      bool wait = true);
+      const Option<master::Flags>& flags = None());
 
   // Starts a master with the specified allocator process and flags.
-  // Waits for the master to detect a leader (could be itself) before
-  // returning if 'wait' is set to true.
-  // TODO(xujyan): Return a future which becomes ready when the
-  // master detects a leader (when wait == true) and have the tests
-  // do AWAIT_READY.
   virtual Try<process::PID<master::Master> > StartMaster(
       master::allocator::AllocatorProcess* allocator,
-      const Option<master::Flags>& flags = None(),
-      bool wait = true);
+      const Option<master::Flags>& flags = None());
 
   // Starts a master with the specified authorizer and flags.
   // Waits for the master to detect a leader (could be itself) before
   // returning if 'wait' is set to true.
   virtual Try<process::PID<master::Master> > StartMaster(
       Authorizer* authorizer,
-      const Option<master::Flags>& flags = None(),
-      bool wait = true);
+      const Option<master::Flags>& flags = None());
 
   // Starts a slave with the specified flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(