You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/08/07 09:17:27 UTC
git commit: Improved framework rate limiting by imposing the max
number of outstanding messages per framework principal.
Repository: mesos
Updated Branches:
refs/heads/master 62086313d -> 4a6e69e69
Improved framework rate limiting by imposing the max number of outstanding messages per framework principal.
Review: https://reviews.apache.org/r/24343
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a6e69e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a6e69e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a6e69e6
Branch: refs/heads/master
Commit: 4a6e69e69e04caba58dae8fe656ad2d81a8a6eb5
Parents: 6208631
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Aug 6 16:31:53 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Thu Aug 7 00:15:51 2014 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 25 +++-
src/examples/load_generator_framework.cpp | 7 +-
src/master/master.cpp | 109 +++++++++++++----
src/master/master.hpp | 37 +++++-
src/tests/rate_limiting_tests.cpp | 160 +++++++++++++++++++++++++
5 files changed, 303 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a6e69e6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 628cce1..efb4239 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -779,26 +779,39 @@ message ACLs {
* principal.
*/
message RateLimit {
- // Leaving QPS unset gives it unlimited rate (i.e., not throttled).
+ // Leaving QPS unset gives it unlimited rate (i.e., not throttled),
+ // which also implies unlimited capacity.
optional double qps = 1;
// Principal of framework(s) to be throttled. Should match
// FrameworkInfo.princpal and Credential.principal (if using authentication).
required string principal = 2;
+
+ // Max number of outstanding messages from frameworks of this principal
+ // allowed by master before the next message is dropped and an error is sent
+ // back to the sender. Messages received before the capacity is reached are
+ // still going to be processed after the error is sent.
+ // If unspecified, this principal is assigned unlimited capacity.
+ // NOTE: This value is ignored if 'qps' is not set.
+ optional uint64 capacity = 3;
}
/**
* Collection of RateLimit.
- * Frameworks without rate limits defined here are not throttled
- * unless 'aggregate_default_qps' is specified.
+ * Frameworks without rate limits defined here are not throttled unless
+ * 'aggregate_default_qps' is specified.
*/
message RateLimits {
// Items should have unique principals.
repeated RateLimit limits = 1;
- // All the frameworks not specified in 'limits' get this default
- // rate. This rate is an aggregate rate for all of them, i.e.,
- // their combined traffic is throttled together at this rate.
+ // All the frameworks not specified in 'limits' get this default rate.
+ // This rate is an aggregate rate for all of them, i.e., their combined
+ // traffic is throttled together at this rate.
optional double aggregate_default_qps = 2;
+
+ // All the frameworks not specified in 'limits' get this default capacity.
+ // This is an aggregate value similar to 'aggregate_default_qps'.
+ optional uint64 aggregate_default_capacity = 3;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a6e69e6/src/examples/load_generator_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/load_generator_framework.cpp b/src/examples/load_generator_framework.cpp
index 7d94c49..01a567b 100644
--- a/src/examples/load_generator_framework.cpp
+++ b/src/examples/load_generator_framework.cpp
@@ -211,7 +211,12 @@ public:
const SlaveID&,
int) {}
- virtual void error(SchedulerDriver*, const string&) {}
+ virtual void error(SchedulerDriver*, const string& error)
+ {
+ // Terminating process with EXIT here because we cannot interrupt
+ // LoadGenerator's long-running loop.
+ EXIT(1) << "Error received: " << error;
+ }
private:
LoadGenerator* generator;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a6e69e6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 97e4340..d279edb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,7 +84,6 @@ using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
-using process::RateLimiter;
using process::Time;
using process::Timer;
using process::UPID;
@@ -371,12 +370,18 @@ void Master::initialize()
<< ". It must be a positive number";
}
- limiters.put(
- limit_.principal(),
- limit_.has_qps()
- ? Option<Owned<RateLimiter> >::some(
- Owned<RateLimiter>(new RateLimiter(limit_.qps())))
- : Option<Owned<RateLimiter> >::none());
+ if (limit_.has_qps()) {
+ Option<uint64_t> capacity;
+ if (limit_.has_capacity()) {
+ capacity = limit_.capacity();
+ }
+ limiters.put(
+ limit_.principal(),
+ Owned<BoundedRateLimiter>(
+ new BoundedRateLimiter(limit_.qps(), capacity)));
+ } else {
+ limiters.put(limit_.principal(), None());
+ }
}
if (flags.rate_limits.get().has_aggregate_default_qps() &&
@@ -387,8 +392,13 @@ void Master::initialize()
}
if (flags.rate_limits.get().has_aggregate_default_qps()) {
- defaultLimiter = Owned<RateLimiter>(
- new RateLimiter(flags.rate_limits.get().aggregate_default_qps()));
+ Option<uint64_t> capacity;
+ if (flags.rate_limits.get().has_aggregate_default_capacity()) {
+ capacity = flags.rate_limits.get().aggregate_default_capacity();
+ }
+ defaultLimiter = Owned<BoundedRateLimiter>(
+ new BoundedRateLimiter(
+ flags.rate_limits.get().aggregate_default_qps(), capacity));
}
LOG(INFO) << "Framework rate limiting enabled";
@@ -851,9 +861,6 @@ void Master::visit(const MessageEvent& event)
return;
}
- // Necessary to disambiguate below.
- typedef void(Self::*F)(const MessageEvent&);
-
// Throttle the message if it's a framework message and a
// RateLimiter is configured for the framework's principal.
// The framework is throttled by the default RateLimiter if:
@@ -864,30 +871,42 @@ void Master::visit(const MessageEvent& event)
// 1) the default RateLimiter is not configured to handle case 2)
// above. (or)
// 2) the principal exists in RateLimits but 'qps' is not set.
- if (principal.isSome() &&
- limiters.contains(principal.get()) &&
- limiters[principal.get()].isSome()) {
- limiters[principal.get()].get()->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ Option<Owned<BoundedRateLimiter> > limiter;
+ if (principal.isSome() && limiters.contains(principal.get())) {
+ limiter = limiters[principal.get()];
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
- isRegisteredFramework &&
- defaultLimiter.isSome()) {
- defaultLimiter.get()->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ isRegisteredFramework) {
+ limiter = defaultLimiter;
+ }
+
+ // Now throttle the message if a limiter is found, unless its
+ // capacity is already reached.
+ if (limiter.isSome()) {
+ if (limiter.get()->capacity.isNone() ||
+ limiter.get()->messages < limiter.get()->capacity.get()) {
+ limiter.get()->messages++;
+ limiter.get()->limiter->acquire()
+ .onReady(defer(self(), &Self::throttled, event, principal));
+ } else {
+ exceededCapacity(
+ event,
+ principal,
+ limiter.get()->capacity.get());
+ }
} else {
_visit(event);
}
}
-void Master::visit(const process::ExitedEvent& event)
+void Master::visit(const ExitedEvent& event)
{
// See comments in 'visit(const MessageEvent& event)' for which
// RateLimiter is used to throttle this UPID and when it is not
// throttled.
// Note that throttling ExitedEvent is necessary so the order
// between MessageEvents and ExitedEvents from the same PID is
- // maintained.
+ // maintained. Also ExitedEvents are not subject to the capacity.
bool isRegisteredFramework = frameworks.principals.contains(event.pid);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.pid]
@@ -899,12 +918,12 @@ void Master::visit(const process::ExitedEvent& event)
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
- limiters[principal.get()].get()->acquire()
+ limiters[principal.get()].get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
isRegisteredFramework &&
defaultLimiter.isSome()) {
- defaultLimiter.get()->acquire()
+ defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
@@ -912,6 +931,22 @@ void Master::visit(const process::ExitedEvent& event)
}
+void Master::throttled(
+ const MessageEvent& event,
+ const Option<std::string>& principal)
+{
+ // We already know a RateLimiter is used to throttle this event so
+ // here we only need to determine which.
+ if (principal.isSome()) {
+ limiters[principal.get()].get()->messages--;
+ } else {
+ defaultLimiter.get()->messages--;
+ }
+
+ _visit(event);
+}
+
+
void Master::_visit(const MessageEvent& event)
{
// Obtain the principal before processing the Message because the
@@ -936,6 +971,30 @@ void Master::_visit(const MessageEvent& event)
}
+void Master::exceededCapacity(
+ const MessageEvent& event,
+ const Option<string>& principal,
+ uint64_t capacity)
+{
+ LOG(WARNING) << "Dropping message " << event.message->name << " from "
+ << event.message->from
+ << (principal.isSome() ? "(" + principal.get() + ")" : "")
+ << ": capacity(" << capacity << ") exceeded";
+
+ // Send an error to the framework which will abort the scheduler
+ // driver.
+ // NOTE: The scheduler driver will send back a
+ // DeactivateFrameworkMessage which may be dropped as well but this
+ // should be fine because the scheduler is already informed of an
+ // unrecoverable error and should take action to recover.
+ FrameworkErrorMessage message;
+ message.set_message(
+ "Message " + event.message->name +
+ " dropped: capacity(" + stringify(capacity) + ") exceeded");
+ send(event.message->from, message);
+}
+
+
void Master::_visit(const ExitedEvent& event)
{
Process<Master>::visit(event);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a6e69e6/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d8a4d9e..29e8f49 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -241,10 +241,25 @@ protected:
virtual void visit(const process::MessageEvent& event);
virtual void visit(const process::ExitedEvent& event);
+ // Invoked when the message is ready to be executed after
+ // being throttled.
+ // 'principal' being None indicates it is throttled by
+ // 'defaultLimiter'.
+ void throttled(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal);
+
// Continuations of visit().
void _visit(const process::MessageEvent& event);
void _visit(const process::ExitedEvent& event);
+ // Helper method invoked when the capacity for a framework
+ // principal is exceeded.
+ void exceededCapacity(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal,
+ uint64_t capacity);
+
// Recovers state from the registrar.
process::Future<Nothing> recover();
void recoveredSlavesTimeout(const Registry& registry);
@@ -744,14 +759,30 @@ private:
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
- // RateLimiters keyed by the framework principal.
+ struct BoundedRateLimiter
+ {
+ BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+ : limiter(new process::RateLimiter(qps)),
+ capacity(_capacity),
+ messages(0) {}
+
+ process::Owned<process::RateLimiter> limiter;
+ const Option<uint64_t> capacity;
+
+ // Number of outstanding messages for this RateLimiter.
+ // NOTE: ExitedEvents are throttled but not counted towards
+ // the capacity here.
+ uint64_t messages;
+ };
+
+ // BoundedRateLimiters keyed by the framework principal.
// Like Metrics::Frameworks, all frameworks of the same principal
// are throttled together at a common rate limit.
- hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
+ hashmap<std::string, Option<process::Owned<BoundedRateLimiter> > > limiters;
// The default limiter is for frameworks not specified in
// 'flags.rate_limits'.
- Option<process::Owned<process::RateLimiter> > defaultLimiter;
+ Option<process::Owned<BoundedRateLimiter> > defaultLimiter;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a6e69e6/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index fc23a19..4adebd1 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -954,6 +954,166 @@ TEST_F(RateLimitingTest, SchedulerFailover)
Shutdown();
}
+
+TEST_F(RateLimitingTest, CapacityReached)
+{
+ master::Flags flags = CreateMasterFlags();
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ limit->set_qps(1);
+ limit->set_capacity(2);
+ flags.rate_limits = limits;
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ Clock::pause();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ MockScheduler sched;
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+ // Use a long failover timeout so the master doesn't unregister the
+ // framework right away when it aborts.
+ frameworkInfo.set_failover_timeout(10);
+
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(driver, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Keep sending duplicate RegisterFrameworkMessages. Master sends
+ // FrameworkRegisteredMessage back after processing each of them.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // The first message is not throttled because it's at the head of
+ // the queue.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // The subsequent messages are going to be throttled.
+ Future<process::Message> frameworkErrorMessage =
+ FUTURE_MESSAGE(Eq(FrameworkErrorMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ // Send two messages which will be queued up. This will reach but not
+ // exceed the capacity.
+ for (int i = 0; i < 2; i++) {
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ }
+
+ // Settle to make sure no error is sent just yet.
+ Clock::settle();
+ EXPECT_TRUE(frameworkErrorMessage.isPending());
+
+ // The 3rd message results in an immediate error.
+ Future<Nothing> error;
+ EXPECT_CALL(sched, error(
+ driver,
+ "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) "
+ "exceeded"))
+ .WillOnce(FutureSatisfy(&error));
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ AWAIT_READY(frameworkErrorMessage);
+
+ // Settle to make sure scheduler aborts and its
+ // DeactivateFrameworkMessage is received by master.
+ Clock::settle();
+
+ AWAIT_READY(error);
+
+ // Stop the driver but indicate it wants to failover.
+ EXPECT_EQ(DRIVER_ABORTED, driver->stop(true));
+ EXPECT_EQ(DRIVER_STOPPED, driver->join());
+ delete driver;
+
+ // Wait for half a second for metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ // Four messages not processed, two in the queue and two dropped.
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // Advance three times for the two pending messages and the exited
+ // event to be processed.
+ for (int i = 0; i < 3; i++) {
+ Clock::advance(Milliseconds(1001));
+ Clock::settle();
+ }
+
+ // Counters are not removed because the scheduler is not
+ // unregistered and the master expects it to failover.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ // Two messages are dropped.
+ EXPECT_EQ(3, metrics.values[messages_processed].as<JSON::Number>().value);
+
+ Shutdown();
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {