You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/23 20:25:45 UTC
[2/6] git commit: Added more tests for framework rate limiting.
Added more tests for framework rate limiting.
Review: https://reviews.apache.org/r/22740
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac44eb59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac44eb59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac44eb59
Branch: refs/heads/master
Commit: ac44eb59beef16cf7383bc79834d7a4b62fe5567
Parents: a6236a4
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jun 20 14:56:37 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Mon Jun 23 11:19:40 2014 -0700
----------------------------------------------------------------------
src/tests/rate_limiting_tests.cpp | 805 +++++++++++++++++++++++++--------
1 file changed, 604 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac44eb59/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 2a37fc1..9775b77 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -51,6 +51,10 @@ using testing::_;
using testing::Eq;
using testing::Return;
+namespace mesos {
+namespace internal {
+namespace master {
+
// Query Mesos metrics snapshot endpoint and return a JSON::Object
// result.
#define METRICS_SNAPSHOT \
@@ -70,27 +74,68 @@ using testing::Return;
// This test case covers tests related to framework API rate limiting
// which includes metrics exporting for API call rates.
-class RateLimitingTest : public MesosTest {};
+class RateLimitingTest : public MesosTest
+{
+public:
+ virtual master::Flags CreateMasterFlags()
+ {
+ master::Flags flags = MesosTest::CreateMasterFlags();
+
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ // Set 1qps so that the half-second Clock::advance()s for
+ // metrics endpoint (because it also throttles requests but at
+ // 2qps) don't mess with framework rate limiting.
+ limit->set_qps(1);
+ flags.rate_limits = JSON::Protobuf(limits);
+
+ return flags;
+ }
+};
// Verify that message counters for a framework are added when a
-// framework registers and removed when it terminates.
-TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+// framework registers, removed when it terminates and count messages
+// correctly when it is given unlimited rate.
+TEST_F(RateLimitingTest, NoRateLimiting)
{
- Try<PID<Master> > master = StartMaster();
+ // Give the framework unlimited rate explicitly by specifying a
+ // RateLimit entry without 'qps'
+ master::Flags flags = CreateMasterFlags();
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ flags.rate_limits = JSON::Protobuf(limits);
+
+ Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
// Message counters not present before the framework registers.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
MockScheduler sched;
@@ -99,25 +144,49 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
MesosSchedulerDriver* driver = new MesosSchedulerDriver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- Future<Nothing> registered;
EXPECT_CALL(sched, registered(driver, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
ASSERT_EQ(DRIVER_RUNNING, driver->start());
- AWAIT_READY(registered);
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
- // Message counters added after the framework is registered.
+ // Send a duplicate RegisterFrameworkMessage. Master sends
+ // FrameworkRegisteredMessage back after processing it.
{
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
JSON::Object metrics = METRICS_SNAPSHOT;
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
Future<Nothing> frameworkRemoved =
@@ -127,36 +196,195 @@ TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
driver->join();
delete driver;
+ // The fact that UnregisterFrameworkMessage (the 2nd message from
+ // 'sched' that reaches Master after its registration) gets
+ // processed without Clock advances proves that the framework is
+ // given unlimited rate.
AWAIT_READY(frameworkRemoved);
- // Message counter removed after the framework is unregistered.
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ // Message counters removed after the framework is unregistered.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 0u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
Shutdown();
}
-// Verify that framework message counters work with frameworks of
-// different principals.
-TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+// Verify that a framework is being correctly throttled at the
+// configured rate.
+TEST_F(RateLimitingTest, RateLimitingEnabled)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver.start());
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Keep sending duplicate RegisterFrameworkMessages. Master sends
+ // FrameworkRegisteredMessage back after processing each of them.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // The first message is not throttled because it's at the head of
+ // the queue.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // The 2nd message is throttled for a second.
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // Advance for half a second and verify that the message is still
+ // not processed.
+ Clock::advance(Milliseconds(501));
+
+ // Settle to make sure all events not delayed are processed.
+ Clock::settle();
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+ // The 2nd message is received and but not processed after half
+ // a second because of throttling.
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+ }
+
+ // After another half a second the message should be processed.
+ Clock::advance(Milliseconds(501));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify counters after processing of the message.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+
+ EXPECT_EQ(DRIVER_STOPPED, driver.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver.join());
+
+ Shutdown();
+}
+
+
+// Verify that framework message counters and rate limiters work with
+// frameworks of different principals which are throttled at
+// different rates.
+TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
{
- // 1. Register two frameworks.
master::Flags flags = CreateMasterFlags();
+
+ // Configure RateLimits to be 1qps and 0.5qps for two frameworks.
+ RateLimits limits;
+ RateLimit* limit1 = limits.mutable_limits()->Add();
+ limit1->set_principal("framework1");
+ limit1->set_qps(1);
+ RateLimit* limit2 = limits.mutable_limits()->Add();
+ limit2->set_principal("framework2");
+ limit2->set_qps(0.5);
+ flags.rate_limits = JSON::Protobuf(limits);
+
flags.authenticate_frameworks = false;
Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ // 1. Register two frameworks.
+
+ // 1.1. Create the first framework.
FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo1.set_principal("framework1");
@@ -167,16 +395,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
MesosSchedulerDriver* driver1 =
new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched1, registered(driver1, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .Times(1);
- ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched1.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- AWAIT_READY(registered);
- }
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registerFrameworkMessage1);
+ AWAIT_READY(frameworkRegisteredMessage1);
+ const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+ // 1.2. Create the second framework.
FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo2.set_principal("framework2");
@@ -184,17 +420,123 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched2.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registerFrameworkMessage2);
+ AWAIT_READY(frameworkRegisteredMessage2);
+
+ const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
+
+ // 2. Send duplicate RegisterFrameworkMessages from the two
+ // schedulers to Master.
+
+ // The first messages are not throttled because they are at the
+ // head of the queue.
{
- Future<Nothing> registered;
- EXPECT_CALL(sched2, registered(&driver2, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
- ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+ }
- AWAIT_READY(registered);
+ // Send the second batch of messages which should be throttled.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+ // Settle to make sure the pending futures below are indeed due
+ // to throttling.
+ Clock::settle();
+
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage1.isPending());
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ {
+ // Verify counters also indicate that messages are received but
+ // not processed.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
+ }
+
+ // Advance for a second so the message from framework1 (1qps)
+ // should be processed.
+ Clock::advance(Seconds(1));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ // Framework1's message is processed and framework2's is not
+ // because it's throttled at a lower rate.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 1,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
+
+ // After another half a second framework2 (0.2qps)'s message is
+ // processed as well.
+ Clock::advance(Seconds(1));
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
}
- // 2. Verify that both frameworks have message counters added.
+ // 2. Counters confirm that both frameworks' messages are processed.
{
JSON::Object metrics = METRICS_SNAPSHOT;
@@ -206,9 +548,26 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
1u, metrics.values.count("frameworks/framework2/messages_received"));
EXPECT_EQ(
1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_received"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework1/messages_processed"]
+ .as<JSON::Number>().value);
+ EXPECT_EQ(
+ 2,
+ metrics.values["frameworks/framework2/messages_processed"]
+ .as<JSON::Number>().value);
}
- // 3. Remove a framework.
+ // 3. Remove a framework and its message counters are deleted while
+ // the other framework's counters stay.
Future<Nothing> frameworkRemoved =
FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
@@ -216,22 +575,28 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
driver1->join();
delete driver1;
+ // No need to advance again because we already advanced 1sec for
+ // sched2 so the RateLimiter for sched1 doesn't impose a delay this
+ // time.
AWAIT_READY(frameworkRemoved);
- // 4. Its message counters are deleted while the other framework's
- // counters stay.
- {
- JSON::Object metrics = METRICS_SNAPSHOT;
+ // Settle to avoid the race between the removal of the counters and
+ // the metrics endpoint query.
+ Clock::settle();
- EXPECT_EQ(
- 0u, metrics.values.count("frameworks/framework1/messages_received"));
- EXPECT_EQ(
- 0u, metrics.values.count("frameworks/framework1/messages_processed"));
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/framework2/messages_received"));
- EXPECT_EQ(
- 1u, metrics.values.count("frameworks/framework2/messages_processed"));
- }
+ // Advance for Metrics rate limiting.
+ Clock::advance(Milliseconds(501));
+
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
driver2.stop();
driver2.join();
@@ -241,56 +606,135 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
// Verify that if multiple frameworks use the same principal, they
-// share the same counters and removing one framework doesn't remove
-// the counters.
-TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+// share the same counters, are throtted at the same rate and
+// removing one framework doesn't remove the counters.
+TEST_F(RateLimitingTest, SamePrincipalFrameworks)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ // 1. Register two frameworks.
+
+ // 1.1. Create the first framework.
MockScheduler sched1;
// Create MesosSchedulerDriver on the heap because of the need to
// destroy it during the test due to MESOS-1456.
MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
&sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched1, registered(driver1, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched1.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
- AWAIT_READY(registered);
- }
+ AWAIT_READY(registerFrameworkMessage1);
+ AWAIT_READY(frameworkRegisteredMessage1);
+
+ const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to;
+
+ // 1.2. Create the second framework.
// 'sched2' uses the same principal "test-principal".
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
- {
- Future<Nothing> registered;
- EXPECT_CALL(sched2, registered(&driver2, _, _))
- .WillOnce(FutureSatisfy(®istered));
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .Times(1);
- ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+ // Grab the stuff we need to replay the RegisterFrameworkMessage
+ // for sched2.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
- AWAIT_READY(registered);
- }
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registerFrameworkMessage2);
+ AWAIT_READY(frameworkRegisteredMessage2);
+
+ const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to;
// Message counters added after both frameworks are registered.
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ // The 1st message from sched1 is not throttled as it's at the head
+ // of the queue but the 1st message from sched2 is because it's
+ // throttled by the same RateLimiter.
+ Future<process::Message> duplicateFrameworkRegisteredMessage1 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched1Pid);
+ Future<process::Message> duplicateFrameworkRegisteredMessage2 =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ sched2Pid);
+
+ process::post(sched1Pid, master.get(), registerFrameworkMessage1.get());
+ process::post(sched2Pid, master.get(), registerFrameworkMessage2.get());
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage1);
+
+ // Settle to make sure the pending future is indeed caused by
+ // throttling.
+ Clock::settle();
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending());
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ // Two messages received and one processed.
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
+ // Advance for another half a second to make sure throttled
+ // message is processed.
+ Clock::advance(Milliseconds(501));
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage2);
+
Future<Nothing> frameworkRemoved =
FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
@@ -298,19 +742,29 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
driver1->join();
delete driver1;
+ // Advance to let UnregisterFrameworkMessage come through.
+ Clock::settle();
+ Clock::advance(Seconds(1));
+
AWAIT_READY(frameworkRemoved);
// Message counters are not removed after the first framework is
// unregistered.
+
+ // For metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
{
JSON::Object metrics = METRICS_SNAPSHOT;
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_received"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
EXPECT_EQ(
- 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
- "/messages_processed"));
+ 1u,
+ metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
}
driver2.stop();
@@ -321,12 +775,24 @@ TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
// Verify that when a scheduler fails over, the new scheduler
-// instance continues to use the same counters.
-TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+// instance continues to use the same counters and RateLimiter.
+TEST_F(RateLimitingTest, SchedulerFailover)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
+ Clock::pause();
+
+ // Settle to make sure master is ready for incoming requests, i.e.,
+ // '_recover()' completes.
+ Clock::settle();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
// 1. Launch the first (i.e., failing) scheduler and verify its
// counters.
@@ -339,11 +805,9 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
.WillOnce(FutureArg<1>(&frameworkId));
{
- // Grab this message so we can resend it.
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
RegisterFrameworkMessage(), _, master.get());
-
- // Grab this message to get the scheduler's pid.
Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
@@ -369,9 +833,7 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
AWAIT_READY(duplicateFrameworkRegisteredMessage);
// Settle to make sure message_processed counters are updated.
- Clock::pause();
Clock::settle();
- Clock::resume();
// Verify the message counters.
JSON::Object metrics = METRICS_SNAPSHOT;
@@ -379,12 +841,12 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
// One message received and processed after the framework is
// registered.
const string& messages_received =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
EXPECT_EQ(1u, metrics.values.count(messages_received));
EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
const string& messages_processed =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
EXPECT_EQ(1u, metrics.values.count(messages_processed));
EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
@@ -410,147 +872,88 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
.WillOnce(FutureSatisfy(&sched1Error));
- {
- // Grab this message to get the scheduler's pid and to make sure we
- // wait until the framework is registered.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
- Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+ // Grab the stuff we need to replay the ReregisterFrameworkMessage
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+ Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+ FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
- // Grab this message so we can resend it.
- Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
- FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+ driver2.start();
- driver2.start();
+ AWAIT_READY(reregisterFrameworkMessage);
+ AWAIT_READY(sched1Error);
+ AWAIT_READY(frameworkRegisteredMessage);
- AWAIT_READY(reregisterFrameworkMessage);
- AWAIT_READY(sched1Error);
- AWAIT_READY(frameworkRegisteredMessage);
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
+ // Sending a duplicate ReregisterFrameworkMessage to test the
+ // message counters with the new scheduler instance.
+ process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
- // Sending a duplicate ReregisterFrameworkMessage to test the
- // message counters with the new scheduler instance.
- process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+ // Settle to make sure everything not delayed is processed.
+ Clock::settle();
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ // Throttled because the same RateLimiter instance is
+ // throttling the new scheduler instance.
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
- // Settle to make sure message_processed counters are updated.
- Clock::pause();
- Clock::settle();
- Clock::resume();
+ // Advance for metrics.
+ Clock::advance(Milliseconds(501));
+ {
JSON::Object metrics = METRICS_SNAPSHOT;
- // Another message after sched2 is reregistered plus the one from
- // the sched1.
+ // Verify that counters correctly indicates the message is
+ // received but not processed.
const string& messages_received =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
EXPECT_EQ(1u, metrics.values.count(messages_received));
EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
const string& messages_processed =
- "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
EXPECT_EQ(1u, metrics.values.count(messages_processed));
- EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
}
- EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+ // Need another half a second to have it processed.
+ Clock::advance(Milliseconds(501));
- EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver1.join());
-
- Shutdown();
-}
-
-
-// Verify that a framework is being correctly throttled at the
-// configured rate.
-TEST_F(RateLimitingTest, ThrottleFramework)
-{
- // Throttle at 1 QPS.
- RateLimits limits;
- RateLimit* limit = limits.mutable_limits()->Add();
- limit->set_principal(DEFAULT_CREDENTIAL.principal());
- limit->set_qps(1);
- master::Flags flags = CreateMasterFlags();
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
- flags.rate_limits = JSON::Protobuf(limits);
-
- Try<PID<Master> > master = StartMaster(flags);
- ASSERT_SOME(master);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+ // Advance for metrics.
+ Clock::advance(Milliseconds(501));
- EXPECT_CALL(sched, registered(&driver, _, _))
- .Times(1);
-
- // Grab this message so we can resend it.
- Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
- RegisterFrameworkMessage(), _, master.get());
-
- // Grab this message to get the scheduler's pid and to make sure we
- // wait until the framework is registered.
- Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
- Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
-
- ASSERT_EQ(DRIVER_RUNNING, driver.start());
-
- AWAIT_READY(registerFrameworkMessage);
- AWAIT_READY(frameworkRegisteredMessage);
-
- const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
-
- Clock::pause();
-
- // Keep sending duplicate RegisterFrameworkMessages. Master sends
- // FrameworkRegisteredMessage back after processing each of them.
{
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
-
- process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
- Clock::settle();
-
- // The first message is not throttled because it's at the head of
- // the queue.
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
- }
-
- {
- Future<process::Message> duplicateFrameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
- master.get(),
- _);
-
- process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
- Clock::settle();
-
- // The 2nd message is throttled for a second.
- Clock::advance(Milliseconds(500));
- Clock::settle();
-
- EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+ JSON::Object metrics = METRICS_SNAPSHOT;
- Clock::advance(Milliseconds(501));
- Clock::settle();
+ // Another message after sched2 is reregistered plus the one from
+ // the sched1.
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
- AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
}
- Clock::resume();
+ EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver2.join());
- EXPECT_EQ(DRIVER_STOPPED, driver.stop());
- EXPECT_EQ(DRIVER_STOPPED, driver.join());
+ EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver1.join());
Shutdown();
}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {