You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/23 20:25:44 UTC
[1/6] git commit: Added support for optionally throttling the
frameworks not specified in RateLimits config.
Repository: mesos
Updated Branches:
refs/heads/master bdca37fe6 -> f943e2fc8
Added support for optionally throttling the frameworks not specified in RateLimits config.
Review: https://reviews.apache.org/r/22777
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f943e2fc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f943e2fc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f943e2fc
Branch: refs/heads/master
Commit: f943e2fc8ecd06b2c3b807e7ad6781cec21617c3
Parents: 1cd0a07
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 00:29:10 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 13 ++---
src/master/flags.hpp | 3 +-
src/master/master.cpp | 96 ++++++++++++++++++++++++++--------
src/master/master.hpp | 9 +++-
src/tests/rate_limiting_tests.cpp | 6 +--
5 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 2f6be05..6968411 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -765,14 +765,15 @@ message RateLimit {
/**
* Collection of RateLimit.
- * Frameworks without rate limits defined here are not throttled.
- * TODO(xujyan): Currently when a framework is not specified in 'limits' it is
- * not throttled. This can be done more explicitly by adding a RateLimit entry
- * without setting its 'qps'. We should consider adding an optional
- * 'aggregate_default_qps' which can be used to throttle the frameworks not
- * specified here.
+ * Frameworks without rate limits defined here are not throttled
+ * unless 'aggregate_default_qps' is specified.
*/
message RateLimits {
// Items should have unique principals.
repeated RateLimit limits = 1;
+
+ // All the frameworks not specified in 'limits' get this default
+ // rate. This rate is an aggregate rate for all of them, i.e.,
+ // their combined traffic is throttled together at this rate.
+ optional double aggregate_default_qps = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 70751d2..cd2a70e 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -258,7 +258,8 @@ public:
" {\n"
" \"principal\": \"bar\"\n"
" }\n"
- " ]\n"
+ " ],\n"
+ " \"aggregate_default_qps\": 33.3\n"
"}");
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f376e60..bde0e57 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -374,6 +374,18 @@ void Master::initialize()
: Option<Owned<RateLimiter> >::none());
}
+ if (flags.rate_limits.get().has_aggregate_default_qps() &&
+ flags.rate_limits.get().aggregate_default_qps() <= 0) {
+ EXIT(1) << "Invalid aggregate_default_qps: "
+ << flags.rate_limits.get().aggregate_default_qps()
+ << ". It must be a positive number";
+ }
+
+ if (flags.rate_limits.get().has_aggregate_default_qps()) {
+ defaultLimiter = Owned<RateLimiter>(
+ new RateLimiter(flags.rate_limits.get().aggregate_default_qps()));
+ }
+
LOG(INFO) << "Framework rate limiting enabled";
}
@@ -779,13 +791,30 @@ void Master::exited(const UPID& pid)
void Master::visit(const MessageEvent& event)
{
+ // There are three cases about the message's UPID with respect to
+ // 'frameworks.principals':
+ // 1) if a <UPID, principal> pair exists and the principal is Some,
+ // it's a framework with its principal specified.
+ // 2) if a <UPID, principal> pair exists and the principal is None,
+ // it's a framework without a principal.
+ // 3) if a <UPID, principal> pair does not exist in the map, it's
+ // either an unregistered framework or not a framework.
+ // The logic for framework message counters and rate limiting
+ // mainly concerns with whether the UPID is a *registered*
+ // framework and whether the framework has a principal so we use
+ // these two temp variables to simplify the condition checks below.
+ bool isRegisteredFramework =
+ frameworks.principals.contains(event.message->from);
+ const Option<string> principal = isRegisteredFramework
+ ? frameworks.principals[event.message->from]
+ : Option<string>::none();
+
// Increment the "message_received" counter if the message is from
// a framework and such a counter is configured for it.
// See comments for 'Master::Metrics::Frameworks' and
// 'Master::Frameworks::principals' for details.
- const Option<string> principal =
- frameworks.principals.get(event.message->from);
if (principal.isSome()) {
+ // If the framework has a principal, the counter must exist.
CHECK(metrics.frameworks.contains(principal.get()));
Counter messages_received =
metrics.frameworks.get(principal.get()).get()->messages_received;
@@ -814,20 +843,29 @@ void Master::visit(const MessageEvent& event)
return;
}
+ // Necessary to disambiguate below.
+ typedef void(Self::*F)(const MessageEvent&);
+
// Throttle the message if it's a framework message and a
// RateLimiter is configured for the framework's principal.
+ // The framework is throttled by the default RateLimiter if:
+ // 1) the default RateLimiter is configured (and)
+ // 2) the framework doesn't have a principal or its principal is
+ // not specified in 'flags.rate_limits'.
// The framework is not throttled if:
- // 1) the principal is not specified by the framework (or)
- // 2) the principal doesn't exist in RateLimits configuration (or)
- // 3) the principal exists in RateLimits but 'qps' is not set.
+ // 1) the default RateLimiter is not configured to handle case 2)
+ // above. (or)
+ // 2) the principal exists in RateLimits but 'qps' is not set.
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
- // Necessary to disambiguate.
- typedef void(Self::*F)(const MessageEvent&);
-
limiters[principal.get()].get()->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+ isRegisteredFramework &&
+ defaultLimiter.isSome()) {
+ defaultLimiter.get()->acquire()
+ .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
}
@@ -836,22 +874,30 @@ void Master::visit(const MessageEvent& event)
void Master::visit(const process::ExitedEvent& event)
{
- // Throttle the message if it's a framework message and a
- // RateLimiter is configured for the framework's principal.
+ // See comments in 'visit(const MessageEvent& event)' for which
+ // RateLimiter is used to throttle this UPID and when it is not
+ // throttled.
// Note that throttling ExitedEvent is necessary so the order
// between MessageEvents and ExitedEvents from the same PID is
// maintained.
- // See comments in 'visit(const MessageEvent& event)' for when a
- // message is not throttled.
- const Option<string> principal = frameworks.principals.get(event.pid);
+ bool isRegisteredFramework = frameworks.principals.contains(event.pid);
+ const Option<string> principal = isRegisteredFramework
+ ? frameworks.principals[event.pid]
+ : Option<string>::none();
+
+ // Necessary to disambiguate below.
+ typedef void(Self::*F)(const ExitedEvent&);
+
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
- // Necessary to disambiguate.
- typedef void(Self::*F)(const ExitedEvent&);
-
limiters[principal.get()].get()->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+ isRegisteredFramework &&
+ defaultLimiter.isSome()) {
+ defaultLimiter.get()->acquire()
+ .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
}
@@ -864,7 +910,9 @@ void Master::_visit(const MessageEvent& event)
// mapping may be deleted in handling 'UnregisterFrameworkMessage'
// but its counter still needs to be incremented for this message.
const Option<string> principal =
- frameworks.principals.get(event.message->from);
+ frameworks.principals.contains(event.message->from)
+ ? frameworks.principals[event.message->from]
+ : Option<string>::none();
ProtobufProcess<Master>::visit(event);
@@ -3688,11 +3736,11 @@ void Master::addFramework(Framework* framework)
principal = framework->info.principal();
}
+ CHECK(!frameworks.principals.contains(framework->pid));
+ frameworks.principals.put(framework->pid, principal);
+
// Export framework metrics if a principal is specified.
if (principal.isSome()) {
- CHECK(!frameworks.principals.contains(framework->pid));
- frameworks.principals.put(framework->pid, principal.get());
-
// Create new framework metrics if this framework is the first
// one of this principal. Otherwise existing metrics are reused.
if (!metrics.frameworks.contains(principal.get())) {
@@ -3843,11 +3891,13 @@ void Master::removeFramework(Framework* framework)
// Remove the framework from authenticated.
authenticated.erase(framework->pid);
+ CHECK(frameworks.principals.contains(framework->pid));
+ const Option<string> principal = frameworks.principals[framework->pid];
+
+ frameworks.principals.erase(framework->pid);
+
// Remove the framework's message counters.
- const Option<string> principal = frameworks.principals.get(framework->pid);
if (principal.isSome()) {
- frameworks.principals.erase(framework->pid);
-
// Remove the metrics for the principal if this framework is the
// last one with this principal.
if (!frameworks.principals.containsValue(principal.get())) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f4cdb26..5fef354 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -495,13 +495,14 @@ private:
// Principals of frameworks keyed by PID.
// NOTE: Multiple PIDs can map to the same principal. The
- // differences between this map and 'authenticated' are:
+ // principal is None when the framework doesn't specify it.
+ // The differences between this map and 'authenticated' are:
// 1) This map only includes *registered* frameworks. The mapping
// is added when a framework (re-)registers.
// 2) This map includes unauthenticated frameworks (when Master
// allows them) if they have principals specified in
// FrameworkInfo.
- hashmap<process::UPID, std::string> principals;
+ hashmap<process::UPID, Option<std::string> > principals;
} frameworks;
hashmap<OfferID, Offer*> offers;
@@ -724,6 +725,10 @@ private:
// Like Metrics::Frameworks, all frameworks of the same principal
// are throttled together at a common rate limit.
hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
+
+ // The default limiter is for frameworks not specified in
+ // 'flags.rate_limits'.
+ Option<process::Owned<process::RateLimiter> > defaultLimiter;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/f943e2fc/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9cb3717..fc23a19 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -356,13 +356,13 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
master::Flags flags = CreateMasterFlags();
// Configure RateLimits to be 1qps and 0.5qps for two frameworks.
+ // Rate for the second framework is implicitly specified via
+ // 'aggregate_default_qps'.
RateLimits limits;
RateLimit* limit1 = limits.mutable_limits()->Add();
limit1->set_principal("framework1");
limit1->set_qps(1);
- RateLimit* limit2 = limits.mutable_limits()->Add();
- limit2->set_principal("framework2");
- limit2->set_qps(0.5);
+ limits.set_aggregate_default_qps(0.5);
flags.rate_limits = limits;
flags.authenticate_frameworks = false;
[3/6] git commit: Added a log line to indicate framework rate
limiting is enabled.
Posted by ya...@apache.org.
Added a log line to indicate framework rate limiting is enabled.
Review: https://reviews.apache.org/r/22745
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd8343a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd8343a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd8343a6
Branch: refs/heads/master
Commit: bd8343a634152ca1cb3c6043c0a4ab4f41619b4e
Parents: bdca37f
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Jun 18 10:57:36 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd8343a6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 00152f5..72470da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -381,6 +381,8 @@ void Master::initialize()
Owned<RateLimiter>(new RateLimiter(limit_.qps())))
: Option<Owned<RateLimiter> >::none());
}
+
+ LOG(INFO) << "Framework rate limiting enabled";
}
hashmap<string, RoleInfo> roleInfos;
[2/6] git commit: Added more tests for framework rate limiting.
Posted by ya...@apache.org.
Added more tests for framework rate limiting.
Review: https://reviews.apache.org/r/22740
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac44eb59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac44eb59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac44eb59
Branch: refs/heads/master
Commit: ac44eb59beef16cf7383bc79834d7a4b62fe5567
Parents: a6236a4
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 14:56:37 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/tests/rate_limiting_tests.cpp | 805 +++++++++++++++++++++++++--------
1 file changed, 604 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac44eb59/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 2a37fc1..9775b77 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -51,6 +51,10 @@ using testing::_;
using testing::Eq;
using testing::Return;
+namespace mesos {
+namespace internal {
+namespace master {
+
// Query Mesos metrics snapshot endpoint and return a JSON::Object
// result.
#define METRICS_SNAPSHOT \
@@ -70,27 +74,68 @@ using testing::Return;
// This test case covers tests related to framework API rate limiting
// which includes metrics exporting for API call rates.
-class RateLimitingTest : public MesosTest {};
+class RateLimitingTest : public MesosTest
+{
+public:
+ virtual master::Flags CreateMasterFlags()
+ {
+ master::Flags flags = MesosTest::CreateMasterFlags();
+
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ // Set 1qps so that the half-second Clock::advance()s for
+ // metrics endpoint (because it also throttles requests but at
+ // 2qps) don't mess with framework rate limiting.
+ limit->set_qps(1);
+ flags.rate_limits = JSON::Protobuf(limits);
+
+ return flags;
+ }
+};
// Verify that message counters for a framework are added when a
-// framework registers and removed when it terminates.
-TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+// framework registers, removed when it terminates and count messages
+// correctly when it is given unlimited rate.
+TEST_F(RateLimitingTest, NoRateLimiting)
{
- Try<PID<Master> > master = StartMaster();
+ // Give the framework unlimited rate explicitly by specifying a
+ // RateLimit entry without 'qps'
+ master::Flags flags = CreateMasterFlags();
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ flags.rate_limits = JSON::Protobuf(limits);
+
+ Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
// Message counters not present before the framework registers.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
MockScheduler sched;
@@ -99,25 +144,49 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
MesosSchedulerDriver* driver = new MesosSchedulerDriver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- Future<Nothing> registered;
EXPECT_CALL(sched, registered(driver, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
ASSERT_EQ(DRIVER_RUNNING, driver->start());
- AWAIT_READY(registered);
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
- // Message counters added after the framework is registered.
+ // Send a duplicate RegisterFrameworkMessage. Master sends
+ // FrameworkRegisteredMessage back after processing it.
{
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
JSON::Object metrics = METRICS_SNAPSHOT;
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
Future<Nothing> frameworkRemoved =
@@ -127,36 +196,195 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
driver->join();
delete driver;
+ // The fact that UnregisterFrameworkMessage (the 2nd message from
+ // 'sched' that reaches Master after its registration) gets
+ // processed without Clock advances proves that the framework is
+ // given unlimited rate.
AWAIT_READY(frameworkRemoved);
- // Message counter removed after the framework is unregistered.
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ // Message counters removed after the framework is unregistered.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
Shutdown();
}
-// Verify that framework message counters work with frameworks of
-// different principals.
-TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+// Verify that a framework is being correctly throttled at the
+// configured rate.
+TEST_F(RateLimitingTest, RateLimitingEnabled)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver.start());
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Keep sending duplicate RegisterFrameworkMessages. Master sends
+ // FrameworkRegisteredMessage back after processing each of them.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // The first message is not throttled because it's at the head of
+ // the queue.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // The 2nd message is throttled for a second.
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // Advance for half a second and verify that the message is still
+ // not processed.
+ Clock::advance(Milliseconds(501));
+
+ // Settle to make sure all events not delayed are processed.
+ Clock::settle();
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+ // The 2nd message is received and but not processed after half
+ // a second because of throttling.
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+ }
+
+ // After another half a second the message should be processed.
+ Clock::advance(Milliseconds(501));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify counters after processing of the message.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+
+ EXPECT_EQ(DRIVER_STOPPED, driver.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver.join());
+
+ Shutdown();
+}
+
+
+// Verify that framework message counters and rate limiters work with
+// frameworks of different principals which are throttled at
+// different rates.
+TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
{
- // 1. Register two frameworks.
master::Flags flags = CreateMasterFlags();
+
+ // Configure RateLimits to be 1qps and 0.5qps for two frameworks.
+ RateLimits limits;
+ RateLimit* limit1 = limits.mutable_limits()->Add();
+ limit1->set_principal("framework1");
+ limit1->set_qps(1);
+ RateLimit* limit2 = limits.mutable_limits()->Add();
+ limit2->set_principal("framework2");
+ limit2->set_qps(0.5);
+ flags.rate_limits = JSON::Protobuf(limits);
+
flags.authenticate_frameworks = false;
Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ // 1. Register two frameworks.
+
+ // 1.1. Create the first framework.
FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo1.set_principal("framework1");
@@ -167,16 +395,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
MesosSchedulerDriver* driver1 =
new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched1, registered(driver1, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .Times(1);
- ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched1.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- AWAIT_READY(registered);
- }
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registerFrameworkMessage1);
+ AWAIT_READY(frameworkRegisteredMessage1);
+ const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+ // 1.2. Create the second framework.
FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo2.set_principal("framework2");
@@ -184,17 +420,123 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched2.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registerFrameworkMessage2);
+ AWAIT_READY(frameworkRegisteredMessage2);
+
+ const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
+
+ // 2. Send duplicate RegisterFrameworkMessages from the two
+ // schedulers to Master.
+
+ // The first messages are not throttled because they are at the
+ // head of the queue.
{
- Future<Nothing> registered;
- EXPECT_CALL(sched2, registered(&driver2, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
- ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+ }
- AWAIT_READY(registered);
+ // Send the second batch of messages which should be throttled.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+ // Settle to make sure the pending futures below are indeed due
+ // to throttling.
+ Clock::settle();
+
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage1.isPending());
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ {
+ // Verify counters also indicate that messages are received but
+ // not processed.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
+ }
+
+ // Advance for a second so the message from framework1 (1qps)
+ // should be processed.
+ Clock::advance(Seconds(1));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ // Framework1's message is processed and framework2's is not
+ // because it's throttled at a lower rate.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
+
+ // After another half a second framework2 (0.2qps)'s message is
+ // processed as well.
+ Clock::advance(Seconds(1));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
}
- // 2. Verify that both frameworks have message counters added.
+ // 2. Counters confirm that both frameworks' messages are processed.
{
JSON::Object metrics = METRICS_SNAPSHOT;
@@ -206,9 +548,26 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
1u, metrics.values.count("frameworks/framework2/messages_received"));
EXPECT_EQ(
1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
}
- // 3. Remove a framework.
+ // 3. Remove a framework and its message counters are deleted while
+ // the other framework's counters stay.
Future<Nothing> frameworkRemoved =
FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
@@ -216,22 +575,28 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
driver1->join();
delete driver1;
+ // No need to advance again because we already advanced 1sec for
+ // sched2 so the RateLimiter for sched1 doesn't impose a delay this
+ // time.
AWAIT_READY(frameworkRemoved);
- // 4. Its message counters are deleted while the other framework's
- // counters stay.
- {
- JSON::Object metrics = METRICS_SNAPSHOT;
+ // Settle to avoid the race between the removal of the counters and
+ // the metrics endpoint query.
+ Clock::settle();
- EXPECT_EQ(
- 0u, metrics.values.count("frameworks/framework1/messages_received"));
- EXPECT_EQ(
- 0u, metrics.values.count("frameworks/framework1/messages_processed"));
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/framework2/messages_received"));
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/framework2/messages_processed"));
- }
+ // Advance for Metrics rate limiting.
+ Clock::advance(Milliseconds(501));
+
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
driver2.stop();
driver2.join();
@@ -241,56 +606,135 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
// Verify that if multiple frameworks use the same principal, they
-// share the same counters and removing one framework doesn't remove
-// the counters.
-TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+// share the same counters, are throtted at the same rate and
+// removing one framework doesn't remove the counters.
+TEST_F(RateLimitingTest, SamePrincipalFrameworks)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ // 1. Register two frameworks.
+
+ // 1.1. Create the first framework.
MockScheduler sched1;
// Create MesosSchedulerDriver on the heap because of the need to
// destroy it during the test due to MESOS-1456.
MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
&sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched1, registered(driver1, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched1.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
- AWAIT_READY(registered);
- }
+ AWAIT_READY(registerFrameworkMessage1);
+ AWAIT_READY(frameworkRegisteredMessage1);
+
+ const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+ // 1.2. Create the second framework.
// 'sched2' uses the same principal "test-principal".
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched2, registered(&driver2, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .Times(1);
- ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched2.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- AWAIT_READY(registered);
- }
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registerFrameworkMessage2);
+ AWAIT_READY(frameworkRegisteredMessage2);
+
+ const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
// Message counters added after both frameworks are registered.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ // The 1st message from sched1 is not throttled as it's at the head
+ // of the queue but the 1st message from sched2 is because it's
+ // throttled by the same RateLimiter.
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+
+ // Settle to make sure the pending future is indeed caused by
+ // throttling.
+ Clock::settle();
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ // Two messages received and one processed.
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
+ // Advance for another half a second to make sure throttled
+ // message is processed.
+ Clock::advance(Milliseconds(501));
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+
Future<Nothing> frameworkRemoved =
FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
@@ -298,19 +742,29 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
driver1->join();
delete driver1;
+ // Advance to let UnregisterFrameworkMessage come through.
+ Clock::settle();
+ Clock::advance(Seconds(1));
+
AWAIT_READY(frameworkRemoved);
// Message counters are not removed after the first framework is
// unregistered.
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
driver2.stop();
@@ -321,12 +775,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
// Verify that when a scheduler fails over, the new scheduler
-// instance continues to use the same counters.
-TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+// instance continues to use the same counters and RateLimiter.
+TEST_F(RateLimitingTest, SchedulerFailover)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
// 1. Launch the first (i.e., failing) scheduler and verify its
// counters.
@@ -339,11 +805,9 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
.WillOnce(FutureArg<1>(&frameworkId));
{
- // Grab this message so we can resend it.
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
RegisterFrameworkMessage(), _, master.get());
-
- // Grab this message to get the scheduler's pid.
Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
@@ -369,9 +833,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
AWAIT_READY(duplicateFrameworkRegisteredMessage);
// Settle to make sure message_processed counters are updated.
- Clock::pause();
Clock::settle();
- Clock::resume();
// Verify the message counters.
JSON::Object metrics = METRICS_SNAPSHOT;
@@ -379,12 +841,12 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
// One message received and processed after the framework is
// registered.
const string& messages_received =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
EXPECT_EQ(1u, metrics.values.count(messages_received));
EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
const string& messages_processed =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
EXPECT_EQ(1u, metrics.values.count(messages_processed));
EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
@@ -410,147 +872,88 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
.WillOnce(FutureSatisfy(&sched1Error));
- {
- // Grab this message to get the scheduler's pid and to make sure we
- // wait until the framework is registered.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
- Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+ // Grab the stuff we need to replay the ReregisterFrameworkMessage
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+ Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+ FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
- // Grab this message so we can resend it.
- Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
- FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+ driver2.start();
- driver2.start();
+ AWAIT_READY(reregisterFrameworkMessage);
+ AWAIT_READY(sched1Error);
+ AWAIT_READY(frameworkRegisteredMessage);
- AWAIT_READY(reregisterFrameworkMessage);
- AWAIT_READY(sched1Error);
- AWAIT_READY(frameworkRegisteredMessage);
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
+ // Sending a duplicate ReregisterFrameworkMessage to test the
+ // message counters with the new scheduler instance.
+ process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
- // Sending a duplicate ReregisterFrameworkMessage to test the
- // message counters with the new scheduler instance.
- process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+ // Settle to make sure everything not delayed is processed.
+ Clock::settle();
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ // Throttled because the same RateLimiter instance is
+ // throttling the new scheduler instance.
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
- // Settle to make sure message_processed counters are updated.
- Clock::pause();
- Clock::settle();
- Clock::resume();
+ // Advance for metrics.
+ Clock::advance(Milliseconds(501));
+ {
JSON::Object metrics = METRICS_SNAPSHOT;
- // Another message after sched2 is reregistered plus the one from
- // the sched1.
+ // Verify that counters correctly indicates the message is
+ // received but not processed.
const string& messages_received =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
EXPECT_EQ(1u, metrics.values.count(messages_received));
EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
const string& messages_processed =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
EXPECT_EQ(1u, metrics.values.count(messages_processed));
- EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
- EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+ // Need another half a second to have it processed.
+ Clock::advance(Milliseconds(501));
- EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver1.join());
-
- Shutdown();
-}
-
-
-// Verify that a framework is being correctly throttled at the
-// configured rate.
-TEST_F(RateLimitingTest, ThrottleFramework)
-{
- // Throttle at 1 QPS.
- RateLimits limits;
- RateLimit* limit = limits.mutable_limits()->Add();
- limit->set_principal(DEFAULT_CREDENTIAL.principal());
- limit->set_qps(1);
- master::Flags flags = CreateMasterFlags();
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
- flags.rate_limits = JSON::Protobuf(limits);
-
- Try<PID<Master> > master = StartMaster(flags);
- ASSERT_SOME(master);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+ // Advance for metrics.
+ Clock::advance(Milliseconds(501));
- EXPECT_CALL(sched, registered(&driver, _, _))
- .Times(1);
-
- // Grab this message so we can resend it.
- Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
- RegisterFrameworkMessage(), _, master.get());
-
- // Grab this message to get the scheduler's pid and to make sure we
- // wait until the framework is registered.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
- Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
-
- ASSERT_EQ(DRIVER_RUNNING, driver.start());
-
- AWAIT_READY(registerFrameworkMessage);
- AWAIT_READY(frameworkRegisteredMessage);
-
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
-
- Clock::pause();
-
- // Keep sending duplicate RegisterFrameworkMessages. Master sends
- // FrameworkRegisteredMessage back after processing each of them.
{
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
-
- process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
- Clock::settle();
-
- // The first message is not throttled because it's at the head of
- // the queue.
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
- }
-
- {
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
-
- process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
- Clock::settle();
-
- // The 2nd message is throttled for a second.
- Clock::advance(Milliseconds(500));
- Clock::settle();
-
- EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+ JSON::Object metrics = METRICS_SNAPSHOT;
- Clock::advance(Milliseconds(501));
- Clock::settle();
+ // Another message after sched2 is reregistered plus the one from
+ // the sched1.
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
}
- Clock::resume();
+ EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver2.join());
- EXPECT_EQ(DRIVER_STOPPED, driver.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver.join());
+ EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver1.join());
Shutdown();
}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
[6/6] git commit: Refactored the querying and parsing of metrics
snapshot into a MACRO.
Posted by ya...@apache.org.
Refactored the querying and parsing of metrics snapshot into a MACRO.
Review: https://reviews.apache.org/r/22639
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6236a43
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6236a43
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6236a43
Branch: refs/heads/master
Commit: a6236a43d19dfd95b665463215db50525375efbc
Parents: 1b2596d
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 13 19:57:08 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/tests/rate_limiting_tests.cpp | 136 +++++++--------------------------
1 file changed, 26 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6236a43/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9a54461..2a37fc1 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -51,6 +51,23 @@ using testing::_;
using testing::Eq;
using testing::Return;
+// Query Mesos metrics snapshot endpoint and return a JSON::Object
+// result.
+#define METRICS_SNAPSHOT \
+ ({ Future<process::http::Response> response = \
+ process::http::get(MetricsProcess::instance()->self(), "snapshot"); \
+ AWAIT_READY(response); \
+ \
+ EXPECT_SOME_EQ( \
+ "application/json", \
+ response.get().headers.get("Content-Type")); \
+ \
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); \
+ ASSERT_SOME(parse); \
+ \
+ parse.get(); })
+
+
// This test case covers tests related to framework API rate limiting
// which includes metrics exporting for API call rates.
class RateLimitingTest : public MesosTest {};
@@ -65,20 +82,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
// Message counters not present before the framework registers.
{
- // TODO(xujyan): It would be nice to refactor out the common
- // metrics snapshot querying logic into a helper method/MACRO.
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -105,18 +109,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
// Message counters added after the framework is registered.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -138,18 +131,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
// Message counter removed after the framework is unregistered.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -214,18 +196,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
// 2. Verify that both frameworks have message counters added.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
1u, metrics.values.count("frameworks/framework1/messages_received"));
@@ -250,18 +221,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
// 4. Its message counters are deleted while the other framework's
// counters stay.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
0u, metrics.values.count("frameworks/framework1/messages_received"));
@@ -321,18 +281,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
// Message counters added after both frameworks are registered.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -354,18 +303,7 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
// Message counters are not removed after the first framework is
// unregistered.
{
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
@@ -436,18 +374,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
Clock::resume();
// Verify the message counters.
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
// One message received and processed after the framework is
// registered.
@@ -517,18 +444,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
Clock::settle();
Clock::resume();
- Future<process::http::Response> response =
- process::http::get(MetricsProcess::instance()->self(), "snapshot");
- AWAIT_READY(response);
-
- EXPECT_SOME_EQ(
- "application/json",
- response.get().headers.get("Content-Type"));
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
- ASSERT_SOME(parse);
-
- JSON::Object metrics = parse.get();
+ JSON::Object metrics = METRICS_SNAPSHOT;
// Another message after sched2 is reregistered plus the one from
// the sched1.
[5/6] git commit: Added flags::parse() overload for RateLimits
protobuf.
Posted by ya...@apache.org.
Added flags::parse() overload for RateLimits protobuf.
Review: https://reviews.apache.org/r/22758
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1cd0a072
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1cd0a072
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1cd0a072
Branch: refs/heads/master
Commit: 1cd0a072d9dafe8d6835ec02afca13b0124cae8c
Parents: ac44eb5
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Jun 18 13:39:59 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/common/parse.hpp | 14 ++++++++++++++
src/common/type_utils.hpp | 8 ++++++++
src/master/flags.hpp | 2 +-
src/master/master.cpp | 10 +---------
src/tests/rate_limiting_tests.cpp | 6 +++---
5 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/common/parse.hpp
----------------------------------------------------------------------
diff --git a/src/common/parse.hpp b/src/common/parse.hpp
index 5b12e38..4eb5096 100644
--- a/src/common/parse.hpp
+++ b/src/common/parse.hpp
@@ -38,6 +38,20 @@ inline Try<mesos::ACLs> parse(const std::string& value)
return protobuf::parse<mesos::ACLs>(json.get());
}
+
+template<>
+inline Try<mesos::RateLimits> parse(const std::string& value)
+{
+ // Convert from string or file to JSON.
+ Try<JSON::Object> json = parse<JSON::Object>(value);
+ if (json.isError()) {
+ return Error(json.error());
+ }
+
+ // Convert from JSON to Protobuf.
+ return protobuf::parse<mesos::RateLimits>(json.get());
+}
+
} // namespace flags {
#endif // __COMMON_PARSE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 27ea4d2..bb357ac 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -122,6 +122,14 @@ inline std::ostream& operator << (
}
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const RateLimits& limits)
+{
+ return stream << limits.DebugString();
+}
+
+
inline bool operator == (const FrameworkID& left, const FrameworkID& right)
{
return left.value() == right.value();
http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 47bb0dc..70751d2 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -287,7 +287,7 @@ public:
bool authenticate_slaves;
Option<std::string> credentials;
Option<ACLs> acls;
- Option<JSON::Object> rate_limits;
+ Option<RateLimits> rate_limits;
};
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 72470da..f376e60 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -46,7 +46,6 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
@@ -355,15 +354,8 @@ void Master::initialize()
}
if (flags.rate_limits.isSome()) {
- Try<RateLimits> limits =
- ::protobuf::parse<RateLimits>(flags.rate_limits.get());
- if (limits.isError()) {
- EXIT(1) << "Invalid RateLimits format: " << limits.error()
- << " (see --rate_limits flag)";
- }
-
// Add framework rate limiters.
- foreach (const RateLimit& limit_, limits.get().limits()) {
+ foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
if (limiters.contains(limit_.principal())) {
EXIT(1) << "Duplicate principal " << limit_.principal()
<< " found in RateLimits configuration";
http://git-wip-us.apache.org/repos/asf/mesos/blob/1cd0a072/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 9775b77..9cb3717 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -88,7 +88,7 @@ public:
// metrics endpoint (because it also throttles requests but at
// 2qps) don't mess with framework rate limiting.
limit->set_qps(1);
- flags.rate_limits = JSON::Protobuf(limits);
+ flags.rate_limits = limits;
return flags;
}
@@ -106,7 +106,7 @@ TEST_F(RateLimitingTest, NoRateLimiting)
RateLimits limits;
RateLimit* limit = limits.mutable_limits()->Add();
limit->set_principal(DEFAULT_CREDENTIAL.principal());
- flags.rate_limits = JSON::Protobuf(limits);
+ flags.rate_limits = limits;
Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
@@ -363,7 +363,7 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
RateLimit* limit2 = limits.mutable_limits()->Add();
limit2->set_principal("framework2");
limit2->set_qps(0.5);
- flags.rate_limits = JSON::Protobuf(limits);
+ flags.rate_limits = limits;
flags.authenticate_frameworks = false;
[4/6] git commit: Removed the wait in StartMaster() for the master to
get elected before returning because it's no longer necessary.
Posted by ya...@apache.org.
Removed the wait in StartMaster() for the master to get elected before returning because it's no longer necessary.
- Cluster::Masters::start now wait for the master to be recovered, which happens only after it's elected.
Review: https://reviews.apache.org/r/22856
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b2596d6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b2596d6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b2596d6
Branch: refs/heads/master
Commit: 1b2596d6174d284909bdef5548c524ba2fdae305
Parents: bd8343a
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 00:37:42 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/tests/cluster.hpp | 2 ++
src/tests/mesos.cpp | 53 ++++++----------------------------------------
src/tests/mesos.hpp | 19 +++--------------
3 files changed, 11 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 1c96ee7..a83a60e 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -423,6 +423,8 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
// Speed up the tests by ensuring that the Master is recovered
// before the test proceeds. Otherwise, authentication and
// registration messages may be dropped, causing delayed retries.
+ // NOTE: The tests may still need to settle the Clock while it's
+ // paused to ensure that the Master finishes executing _recover().
if (!_recover.await(Seconds(10))) {
LOG(FATAL) << "Failed to wait for _recover";
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 1037420..24af32b 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -45,8 +45,6 @@ using std::string;
using namespace process;
-using testing::_;
-
namespace mesos {
namespace internal {
namespace tests {
@@ -159,67 +157,28 @@ slave::Flags MesosTest::CreateSlaveFlags()
Try<process::PID<master::Master> > MesosTest::StartMaster(
- const Option<master::Flags>& flags,
- bool wait)
+ const Option<master::Flags>& flags)
{
- Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
- Try<process::PID<master::Master> > master = cluster.masters.start(
+ return cluster.masters.start(
flags.isNone() ? CreateMasterFlags() : flags.get());
-
- // Wait until the leader is detected because otherwise this master
- // may reject authentication requests because it doesn't know it's
- // the leader yet [MESOS-881].
- if (wait && master.isSome() && !detected.await(Seconds(10))) {
- return Error("Failed to wait " + stringify(Seconds(10)) +
- " for master to detect the leader");
- }
-
- return master;
}
Try<process::PID<master::Master> > MesosTest::StartMaster(
master::allocator::AllocatorProcess* allocator,
- const Option<master::Flags>& flags,
- bool wait)
+ const Option<master::Flags>& flags)
{
- Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
- Try<process::PID<master::Master> > master = cluster.masters.start(
+ return cluster.masters.start(
allocator, flags.isNone() ? CreateMasterFlags() : flags.get());
-
- // Wait until the leader is detected because otherwise this master
- // may reject authentication requests because it doesn't know it's
- // the leader yet [MESOS-881].
- if (wait && master.isSome() && !detected.await(Seconds(10))) {
- return Error("Failed to wait " + stringify(Seconds(10)) +
- " for master to detect the leader");
- }
-
- return master;
}
Try<process::PID<master::Master> > MesosTest::StartMaster(
Authorizer* authorizer,
- const Option<master::Flags>& flags,
- bool wait)
+ const Option<master::Flags>& flags)
{
- Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
-
- Try<process::PID<master::Master> > master = cluster.masters.start(
+ return cluster.masters.start(
authorizer, flags.isNone() ? CreateMasterFlags() : flags.get());
-
- // Wait until the leader is detected because otherwise this master
- // may reject authentication requests because it doesn't know it's
- // the leader yet [MESOS-881].
- if (wait && master.isSome() && !detected.await(Seconds(10))) {
- return Error("Failed to wait " + stringify(Seconds(10)) +
- " for master to detect the leader");
- }
-
- return master;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b2596d6/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 0b9b2f9..c40c82d 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -83,33 +83,20 @@ protected:
virtual slave::Flags CreateSlaveFlags();
// Starts a master with the specified flags.
- // Waits for the master to detect a leader (could be itself) before
- // returning if 'wait' is set to true.
- // TODO(xujyan): Return a future which becomes ready when the
- // master detects a leader (when wait == true) and have the tests
- // do AWAIT_READY.
virtual Try<process::PID<master::Master> > StartMaster(
- const Option<master::Flags>& flags = None(),
- bool wait = true);
+ const Option<master::Flags>& flags = None());
// Starts a master with the specified allocator process and flags.
- // Waits for the master to detect a leader (could be itself) before
- // returning if 'wait' is set to true.
- // TODO(xujyan): Return a future which becomes ready when the
- // master detects a leader (when wait == true) and have the tests
- // do AWAIT_READY.
virtual Try<process::PID<master::Master> > StartMaster(
master::allocator::AllocatorProcess* allocator,
- const Option<master::Flags>& flags = None(),
- bool wait = true);
+ const Option<master::Flags>& flags = None());
// Starts a master with the specified authorizer and flags.
// Waits for the master to detect a leader (could be itself) before
// returning if 'wait' is set to true.
virtual Try<process::PID<master::Master> > StartMaster(
Authorizer* authorizer,
- const Option<master::Flags>& flags = None(),
- bool wait = true);
+ const Option<master::Flags>& flags = None());
// Starts a slave with the specified flags.
virtual Try<process::PID<slave::Slave> > StartSlave(