You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/17 23:10:37 UTC
[1/5] git commit: Implemented framework API rate limiting.
Repository: mesos
Updated Branches:
refs/heads/master 7d7d94b1d -> a99de1a98
Implemented framework API rate limiting.
Review: https://reviews.apache.org/r/22427
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a99de1a9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a99de1a9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a99de1a9
Branch: refs/heads/master
Commit: a99de1a985316aa00242d65b6f85200fdefe6a90
Parents: ebdb81e
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 14:54:11 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 92 +++++++++++++++++++++++++++++++---
src/master/master.hpp | 23 +++++++--
src/tests/rate_limiting_tests.cpp | 88 +++++++++++++++++++++++++++++++-
3 files changed, 191 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c7357da..888657d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -30,6 +30,7 @@
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
+#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/run.hpp>
@@ -74,6 +75,7 @@ using std::vector;
using process::await;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
+using process::ExitedEvent;
using process::Failure;
using process::Future;
using process::MessageEvent;
@@ -81,6 +83,7 @@ using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
+using process::RateLimiter;
using process::Time;
using process::Timer;
using process::UPID;
@@ -352,14 +355,32 @@ void Master::initialize()
}
if (flags.rate_limits.isSome()) {
- Try<RateLimits> limits_ =
+ Try<RateLimits> limits =
::protobuf::parse<RateLimits>(flags.rate_limits.get());
- if (limits_.isError()) {
- EXIT(1) << "Invalid RateLimits format: " << limits_.error()
+ if (limits.isError()) {
+ EXIT(1) << "Invalid RateLimits format: " << limits.error()
<< " (see --rate_limits flag)";
}
- limits = limits_.get();
- LOG(INFO) << "Framework rate limiting enabled";
+
+ // Add framework rate limiters.
+ foreach (const RateLimit& limit_, limits.get().limits()) {
+ if (limiters.contains(limit_.principal())) {
+ EXIT(1) << "Duplicate principal " << limit_.principal()
+ << " found in RateLimits configuration";
+ }
+
+ if (limit_.has_qps() && limit_.qps() <= 0) {
+ EXIT(1) << "Invalid qps: " << limit_.qps()
+ << ". It must be a positive number";
+ }
+
+ limiters.put(
+ limit_.principal(),
+ limit_.has_qps()
+ ? Option<Owned<RateLimiter> >::some(
+ Owned<RateLimiter>(new RateLimiter(limit_.qps())))
+ : Option<Owned<RateLimiter> >::none());
+ }
}
hashmap<string, RoleInfo> roleInfos;
@@ -799,11 +820,64 @@ void Master::visit(const MessageEvent& event)
return;
}
+ // Throttle the message if it's a framework message and a
+ // RateLimiter is configured for the framework's principal.
+ // The framework is not throttled if:
+ // 1) the principal is not specified by the framework (or)
+ // 2) the principal doesn't exist in RateLimits configuration (or)
+ // 3) the principal exists in RateLimits but 'qps' is not set.
+ if (principal.isSome() &&
+ limiters.contains(principal.get()) &&
+ limiters[principal.get()].isSome()) {
+ // Necessary to disambiguate.
+ typedef void(Self::*F)(const MessageEvent&);
+
+ limiters[principal.get()].get()->acquire()
+ .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ } else {
+ _visit(event);
+ }
+}
+
+
+void Master::visit(const process::ExitedEvent& event)
+{
+ // Throttle the message if it's a framework message and a
+ // RateLimiter is configured for the framework's principal.
+ // Note that throttling ExitedEvent is necessary so the order
+ // between MessageEvents and ExitedEvents from the same PID is
+ // maintained.
+ // See comments in 'visit(const MessageEvent& event)' for when a
+ // message is not throttled.
+ const Option<string> principal = frameworks.principals.get(event.pid);
+ if (principal.isSome() &&
+ limiters.contains(principal.get()) &&
+ limiters[principal.get()].isSome()) {
+ // Necessary to disambiguate.
+ typedef void(Self::*F)(const ExitedEvent&);
+
+ limiters[principal.get()].get()->acquire()
+ .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ } else {
+ _visit(event);
+ }
+}
+
+
+void Master::_visit(const MessageEvent& event)
+{
+ // Obtain the principal before processing the Message because the
+ // mapping may be deleted in handling 'UnregisterFrameworkMessage'
+ // but its counter still needs to be incremented for this message.
+ const Option<string> principal =
+ frameworks.principals.get(event.message->from);
+
ProtobufProcess<Master>::visit(event);
// Increment 'messages_processed' counter if it still exists.
// Note that it could be removed in handling
- // 'UnregisterFrameworkMessage'.
+ // 'UnregisterFrameworkMessage' if it's the last framework with
+ // this principal.
if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
Counter messages_processed =
metrics.frameworks.get(principal.get()).get()->messages_processed;
@@ -812,6 +886,12 @@ void Master::visit(const MessageEvent& event)
}
+void Master::_visit(const ExitedEvent& event)
+{
+ Process<Master>::visit(event);
+}
+
+
void fail(const string& message, const string& failure)
{
LOG(FATAL) << message << ": " << failure;
http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d682613..2844446 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -59,6 +59,9 @@
#include "messages/messages.hpp"
+namespace process {
+class RateLimiter; // Forward declaration.
+}
namespace mesos {
namespace internal {
@@ -235,6 +238,11 @@ protected:
virtual void finalize();
virtual void exited(const process::UPID& pid);
virtual void visit(const process::MessageEvent& event);
+ virtual void visit(const process::ExitedEvent& event);
+
+ // Continuations of visit().
+ void _visit(const process::MessageEvent& event);
+ void _visit(const process::ExitedEvent& event);
// Recovers state from the registrar.
process::Future<Nothing> recover();
@@ -510,8 +518,6 @@ private:
// Principals of authenticated frameworks/slaves keyed by PID.
hashmap<process::UPID, std::string> authenticated;
- Option<RateLimits> limits;
-
int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
@@ -572,9 +578,11 @@ private:
process::metrics::Counter messages_received;
// Framework messages processed.
- // NOTE: This doesn't include dropped messages. Also due to
- // Master's asynchronous nature, this doesn't necessarily mean
- // the work requested by this message has finished.
+ // NOTE: This doesn't include dropped messages. Processing of
+ // a message may be throttled by a RateLimiter if one is
+ // configured for this principal. Also due to Master's
+ // asynchronous nature, this doesn't necessarily mean the work
+ // requested by this message has finished.
process::metrics::Counter messages_processed;
explicit Frameworks(const std::string& principal)
@@ -707,6 +715,11 @@ private:
process::Future<Option<Error> > validate(
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
+
+ // RateLimiters keyed by the framework principal.
+ // Like Metrics::Frameworks, all frameworks of the same principal
+ // are throttled together at a common rate limit.
+ hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index baf912e..9a54461 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -48,7 +48,6 @@ using process::PID;
using std::string;
using testing::_;
-using testing::AtLeast;
using testing::Eq;
using testing::Return;
@@ -552,3 +551,90 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
Shutdown();
}
+
+
+// Verify that a framework is being correctly throttled at the
+// configured rate.
+TEST_F(RateLimitingTest, ThrottleFramework)
+{
+ // Throttle at 1 QPS.
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ limit->set_qps(1);
+ master::Flags flags = CreateMasterFlags();
+
+ flags.rate_limits = JSON::Protobuf(limits);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ // Grab this message so we can resend it.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+
+ // Grab this message to get the scheduler's pid and to make sure we
+ // wait until the framework is registered.
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver.start());
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ Clock::pause();
+
+ // Keep sending duplicate RegisterFrameworkMessages. Master sends
+ // FrameworkRegisteredMessage back after processing each of them.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ Clock::settle();
+
+ // The first message is not throttled because it's at the head of
+ // the queue.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ }
+
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ Clock::settle();
+
+ // The 2nd message is throttled for a second.
+ Clock::advance(Milliseconds(500));
+ Clock::settle();
+
+ EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+
+ Clock::advance(Milliseconds(501));
+ Clock::settle();
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+ }
+
+ Clock::resume();
+
+ EXPECT_EQ(DRIVER_STOPPED, driver.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver.join());
+
+ Shutdown();
+}
[5/5] git commit: Changed RateLimiter to work directly with 'double
permitsPerSecond'.
Posted by ya...@apache.org.
Changed RateLimiter to work directly with 'double permitsPerSecond'.
Review: https://reviews.apache.org/r/22424
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a0b3490
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a0b3490
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a0b3490
Branch: refs/heads/master
Commit: 4a0b34907bdc83f46b3cf61d7578b6939c296834
Parents: d19e588
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 14:42:56 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/limiter.hpp | 30 ++++++++++++++------
1 file changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a0b3490/3rdparty/libprocess/include/process/limiter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/limiter.hpp b/3rdparty/libprocess/include/process/limiter.hpp
index bbe8226..846ec09 100644
--- a/3rdparty/libprocess/include/process/limiter.hpp
+++ b/3rdparty/libprocess/include/process/limiter.hpp
@@ -5,6 +5,7 @@
#include <process/delay.hpp>
#include <process/dispatch.hpp>
+#include <process/id.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
#include <process/timeout.hpp>
@@ -29,6 +30,7 @@ class RateLimiter
{
public:
RateLimiter(int permits, const Duration& duration);
+ explicit RateLimiter(double permitsPerSecond);
~RateLimiter();
// Returns a future that becomes ready when the permit is acquired.
@@ -46,11 +48,19 @@ private:
class RateLimiterProcess : public Process<RateLimiterProcess>
{
public:
- RateLimiterProcess(int _permits, const Duration& _duration)
- : permits(_permits), duration(_duration)
+ RateLimiterProcess(int permits, const Duration& duration)
+ : ProcessBase(ID::generate("__limiter__"))
{
CHECK_GT(permits, 0);
CHECK_GT(duration.secs(), 0);
+ permitsPerSecond = permits / duration.secs();
+ }
+
+ explicit RateLimiterProcess(double _permitsPerSecond)
+ : ProcessBase(ID::generate("__limiter__")),
+ permitsPerSecond(_permitsPerSecond)
+ {
+ CHECK_GT(permitsPerSecond, 0);
}
virtual void finalize()
@@ -78,8 +88,7 @@ public:
}
// No need to wait!
- double rate = permits / duration.secs();
- timeout = Seconds(1) / rate;
+ timeout = Seconds(1) / permitsPerSecond;
return Nothing();
}
@@ -97,8 +106,7 @@ private:
promise->set(Nothing());
- double rate = permits / duration.secs();
- timeout = Seconds(1) / rate;
+ timeout = Seconds(1) / permitsPerSecond;
// Repeat if necessary.
if (!promises.empty()) {
@@ -106,8 +114,7 @@ private:
}
}
- const int permits;
- const Duration duration;
+ double permitsPerSecond;
Timeout timeout;
@@ -122,6 +129,13 @@ inline RateLimiter::RateLimiter(int permits, const Duration& duration)
}
+inline RateLimiter::RateLimiter(double permitsPerSecond)
+{
+ process = new RateLimiterProcess(permitsPerSecond);
+ spawn(process);
+}
+
+
inline RateLimiter::~RateLimiter()
{
terminate(process);
[2/5] git commit: Created framework rate limits protobuf object which
is loaded as JSON through master flags.
Posted by ya...@apache.org.
Created framework rate limits protobuf object which is loaded as JSON through master flags.
Review: https://reviews.apache.org/r/22425
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f9ffddf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f9ffddf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f9ffddf
Branch: refs/heads/master
Commit: 2f9ffddf98f544b778fdeee50a1ae09ec0e7c786
Parents: 4a0b349
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 16 17:21:24 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 30 ++++++++++++++++++++++++++++++
src/master/flags.hpp | 24 ++++++++++++++++++++++++
src/master/master.cpp | 12 ++++++++++++
src/master/master.hpp | 2 ++
4 files changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 709b8b1..2f6be05 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -746,3 +746,33 @@ message ACLs {
repeated ACL.HTTPGet http_get = 4;
repeated ACL.HTTPPut http_put = 5;
}
+
+
+/**
+ * Rate (queries per second, QPS) limit for messages from a framework to master.
+ * Strictly speaking they are the combined rate from all frameworks of the same
+ * principal.
+ */
+message RateLimit {
+ // Leaving QPS unset gives it unlimited rate (i.e., not throttled).
+ optional double qps = 1;
+
+ // Principal of framework(s) to be throttled. Should match
+ // FrameworkInfo.princpal and Credential.principal (if using authentication).
+ required string principal = 2;
+}
+
+
+/**
+ * Collection of RateLimit.
+ * Frameworks without rate limits defined here are not throttled.
+ * TODO(xujyan): Currently when a framework is not specified in 'limits' it is
+ * not throttled. This can be done more explicitly by adding a RateLimit entry
+ * without setting its 'qps'. We should consider adding an optional
+ * 'aggregate_default_qps' which can be used to throttle the frameworks not
+ * specified here.
+ */
+message RateLimits {
+ // Items should have unique principals.
+ repeated RateLimit limits = 1;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 7850e45..47bb0dc 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -237,6 +237,29 @@ public:
" }\n"
" ]\n"
"}");
+
+ add(&Flags::rate_limits,
+ "rate_limits",
+ "The value could be a JSON formatted string of rate limits\n"
+ "or a file path containing the JSON formatted rate limits used\n"
+ "for framework rate limiting.\n"
+ "Path could be of the form 'file:///path/to/file'\n"
+ "or '/path/to/file'.\n"
+ "\n"
+ "See the RateLimits protobuf in mesos.proto for the expected format.\n"
+ "\n"
+ "Example:\n"
+ "{\n"
+ " \"limits\": [\n"
+ " {\n"
+ " \"principal\": \"foo\",\n"
+ " \"qps\": 55.5\n"
+ " },\n"
+ " {\n"
+ " \"principal\": \"bar\"\n"
+ " }\n"
+ " ]\n"
+ "}");
}
bool version;
@@ -264,6 +287,7 @@ public:
bool authenticate_slaves;
Option<std::string> credentials;
Option<ACLs> acls;
+ Option<JSON::Object> rate_limits;
};
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index fb5c770..c7357da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -45,6 +45,7 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
@@ -350,6 +351,17 @@ void Master::initialize()
LOG(INFO) << "Authorization enabled";
}
+ if (flags.rate_limits.isSome()) {
+ Try<RateLimits> limits_ =
+ ::protobuf::parse<RateLimits>(flags.rate_limits.get());
+ if (limits_.isError()) {
+ EXIT(1) << "Invalid RateLimits format: " << limits_.error()
+ << " (see --rate_limits flag)";
+ }
+ limits = limits_.get();
+ LOG(INFO) << "Framework rate limiting enabled";
+ }
+
hashmap<string, RoleInfo> roleInfos;
// Add the default role.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d55c4f5..d682613 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -510,6 +510,8 @@ private:
// Principals of authenticated frameworks/slaves keyed by PID.
hashmap<process::UPID, std::string> authenticated;
+ Option<RateLimits> limits;
+
int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
[4/5] git commit: Changed MessageEvent and ExitedEvent to be copyable.
Posted by ya...@apache.org.
Changed MessageEvent and ExitedEvent to be copyable.
Review: https://reviews.apache.org/r/22426
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ebdb81ea
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ebdb81ea
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ebdb81ea
Branch: refs/heads/master
Commit: ebdb81ea99efc8fab0a50fb42b5b7a140787bc86
Parents: 2f9ffdd
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Sat Jun 7 21:10:55 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/event.hpp | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ebdb81ea/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index e30a4d4..3c860f4 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -76,6 +76,9 @@ struct MessageEvent : Event
explicit MessageEvent(Message* _message)
: message(_message) {}
+ MessageEvent(const MessageEvent& that)
+ : message(that.message == NULL ? NULL : new Message(*that.message)) {}
+
virtual ~MessageEvent()
{
delete message;
@@ -89,8 +92,10 @@ struct MessageEvent : Event
Message* const message;
private:
- // Not copyable, not assignable.
- MessageEvent(const MessageEvent&);
+ // Keep MessageEvent not assignable even though we made it
+ // copyable.
+ // Note that we are violating the "rule of three" here but it helps
+ // keep the fields const.
MessageEvent& operator = (const MessageEvent&);
};
@@ -170,8 +175,9 @@ struct ExitedEvent : Event
const UPID pid;
private:
- // Not copyable, not assignable.
- ExitedEvent(const ExitedEvent&);
+ // Keep ExitedEvent not assignable even though we made it copyable.
+ // Note that we are violating the "rule of three" here but it helps
+ // keep the fields const.
ExitedEvent& operator = (const ExitedEvent&);
};
[3/5] git commit: Added "per-framework-principal" counters for
messages from a scheduler to the master.
Posted by ya...@apache.org.
Added "per-framework-principal" counters for messages from a scheduler to the master.
Review: https://reviews.apache.org/r/22316
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d19e5880
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d19e5880
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d19e5880
Branch: refs/heads/master
Commit: d19e58802fff4f62e2422cded8bfff902d131b64
Parents: 7d7d94b
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 22:05:22 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/master/master.cpp | 72 ++++-
src/master/master.hpp | 50 +++
src/tests/rate_limiting_tests.cpp | 554 +++++++++++++++++++++++++++++++++
4 files changed, 675 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ae6760..b1b7d2d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -987,6 +987,7 @@ mesos_tests_SOURCES = \
tests/monitor_tests.cpp \
tests/paths_tests.cpp \
tests/protobuf_io_tests.cpp \
+ tests/rate_limiting_tests.cpp \
tests/reconciliation_tests.cpp \
tests/registrar_tests.cpp \
tests/repair_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a01b1a..fb5c770 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,6 +84,8 @@ using process::Time;
using process::Timer;
using process::UPID;
+using process::metrics::Counter;
+
using memory::shared_ptr;
namespace mesos {
@@ -750,6 +752,19 @@ void Master::exited(const UPID& pid)
void Master::visit(const MessageEvent& event)
{
+ // Increment the "message_received" counter if the message is from
+ // a framework and such a counter is configured for it.
+ // See comments for 'Master::Metrics::Frameworks' and
+ // 'Master::Frameworks::principals' for details.
+ const Option<string> principal =
+ frameworks.principals.get(event.message->from);
+ if (principal.isSome()) {
+ CHECK(metrics.frameworks.contains(principal.get()));
+ Counter messages_received =
+ metrics.frameworks.get(principal.get()).get()->messages_received;
+ ++messages_received;
+ }
+
// All messages are filtered when non-leading.
if (!elected()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
@@ -773,6 +788,15 @@ void Master::visit(const MessageEvent& event)
}
ProtobufProcess<Master>::visit(event);
+
+ // Increment 'messages_processed' counter if it still exists.
+ // Note that it could be removed in handling
+ // 'UnregisterFrameworkMessage'.
+ if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+ Counter messages_processed =
+ metrics.frameworks.get(principal.get()).get()->messages_processed;
+ ++messages_processed;
+ }
}
@@ -3567,6 +3591,30 @@ void Master::addFramework(Framework* framework)
allocator->frameworkAdded(
framework->id, framework->info, framework->resources);
+
+ // Export framework metrics.
+
+ // If the framework is authenticated, its principal should be in
+ // 'authenticated'. Otherwise look if it's supplied in
+ // FrameworkInfo.
+ Option<string> principal = authenticated.get(framework->pid);
+ if (principal.isNone() && framework->info.has_principal()) {
+ principal = framework->info.principal();
+ }
+
+ // Export framework metrics if a principal is specified.
+ if (principal.isSome()) {
+ CHECK(!frameworks.principals.contains(framework->pid));
+ frameworks.principals.put(framework->pid, principal.get());
+
+ // Create new framework metrics if this framework is the first
+ // one of this principal. Otherwise existing metrics are reused.
+ if (!metrics.frameworks.contains(principal.get())) {
+ metrics.frameworks.put(
+ principal.get(),
+ Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
+ }
+ }
}
@@ -3574,7 +3622,7 @@ void Master::addFramework(Framework* framework)
// event of a scheduler failover.
void Master::failoverFramework(Framework* framework, const UPID& newPid)
{
- const UPID& oldPid = framework->pid;
+ const UPID oldPid = framework->pid;
// There are a few failover cases to consider:
// 1. The pid has changed. In this case we definitely want to
@@ -3622,6 +3670,13 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
offer->framework_id(), offer->slave_id(), offer->resources());
removeOffer(offer);
}
+
+ // 'Failover' the framework's metrics. i.e., change the lookup key
+ // for its metrics to 'newPid'.
+ if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
+ frameworks.principals[newPid] = frameworks.principals[oldPid];
+ frameworks.principals.erase(oldPid);
+ }
}
@@ -3702,7 +3757,20 @@ void Master::removeFramework(Framework* framework)
// Remove the framework from authenticated.
authenticated.erase(framework->pid);
- // Remove it.
+ // Remove the framework's message counters.
+ const Option<string> principal = frameworks.principals.get(framework->pid);
+ if (principal.isSome()) {
+ frameworks.principals.erase(framework->pid);
+
+ // Remove the metrics for the principal if this framework is the
+ // last one with this principal.
+ if (!frameworks.principals.containsValue(principal.get())) {
+ CHECK(metrics.frameworks.contains(principal.get()));
+ metrics.frameworks.erase(principal.get());
+ }
+ }
+
+ // Remove the framework.
frameworks.activated.erase(framework->id);
allocator->frameworkRemoved(framework->id);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7a12185..d55c4f5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -484,6 +484,16 @@ private:
hashmap<FrameworkID, Framework*> activated;
boost::circular_buffer<memory::shared_ptr<Framework> > completed;
+
+ // Principals of frameworks keyed by PID.
+ // NOTE: Multiple PIDs can map to the same principal. The
+ // differences between this map and 'authenticated' are:
+ // 1) This map only includes *registered* frameworks. The mapping
+ // is added when a framework (re-)registers.
+ // 2) This map includes unauthenticated frameworks (when Master
+ // allows them) if they have principals specified in
+ // FrameworkInfo.
+ hashmap<process::UPID, std::string> principals;
} frameworks;
hashmap<OfferID, Offer*> offers;
@@ -544,6 +554,46 @@ private:
// Message counters.
process::metrics::Counter dropped_messages;
+ // Metrics specific to frameworks of a common principal.
+ // These metrics have names prefixed by "frameworks/<principal>/".
+ struct Frameworks
+ {
+ // Counters for messages from all frameworks of this principal.
+ // Note: We only count messages from active scheduler
+ // *instances* while they are *registered*. i.e., messages
+ // prior to the completion of (re)registration
+ // (AuthenticateMessage and (Re)RegisterFrameworkMessage) and
+ // messages from an inactive scheduler instance (after the
+ // framework has failed over) are not counted.
+
+ // Framework messages received (before processing).
+ process::metrics::Counter messages_received;
+
+ // Framework messages processed.
+ // NOTE: This doesn't include dropped messages. Also due to
+ // Master's asynchronous nature, this doesn't necessarily mean
+ // the work requested by this message has finished.
+ process::metrics::Counter messages_processed;
+
+ explicit Frameworks(const std::string& principal)
+ : messages_received("frameworks/" + principal + "/messages_received"),
+ messages_processed("frameworks/" + principal + "/messages_processed")
+ {
+ process::metrics::add(messages_received);
+ process::metrics::add(messages_processed);
+ }
+
+ ~Frameworks()
+ {
+ process::metrics::remove(messages_received);
+ process::metrics::remove(messages_processed);
+ }
+ };
+
+ // Per-framework-principal metrics keyed by the framework
+ // principal.
+ hashmap<std::string, process::Owned<Frameworks> > frameworks;
+
// Messages from schedulers.
process::metrics::Counter messages_register_framework;
process::metrics::Counter messages_reregister_framework;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
new file mode 100644
index 0000000..baf912e
--- /dev/null
+++ b/src/tests/rate_limiting_tests.cpp
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include "master/allocator.hpp"
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::master::allocator::AllocatorProcess;
+
+using process::metrics::internal::MetricsProcess;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtLeast;
+using testing::Eq;
+using testing::Return;
+
+// This test case covers tests related to framework API rate limiting
+// which includes metrics exporting for API call rates.
+class RateLimitingTest : public MesosTest {};
+
+
+// Verify that message counters for a framework are added when a
+// framework registers and removed when it terminates.
+TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Message counters not present before the framework registers.
+ {
+ // TODO(xujyan): It would be nice to refactor out the common
+ // metrics snapshot querying logic into a helper method/MACRO.
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ MockScheduler sched;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+ AWAIT_READY(registered);
+
+ // Message counters added after the framework is registered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver->stop();
+ driver->join();
+ delete driver;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // Message counter removed after the framework is unregistered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Shutdown();
+}
+
+
+// Verify that framework message counters work with frameworks of
+// different principals.
+TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+{
+ // 1. Register two frameworks.
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_principal("framework1");
+
+ MockScheduler sched1;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver1 =
+ new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registered);
+ }
+
+ FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.set_principal("framework2");
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registered);
+ }
+
+ // 2. Verify that both frameworks have message counters added.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ }
+
+ // 3. Remove a framework.
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver1->stop();
+ driver1->join();
+ delete driver1;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // 4. Its message counters are deleted while the other framework's
+ // counters stay.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ }
+
+ driver2.stop();
+ driver2.join();
+
+ Shutdown();
+}
+
+
+// Verify that if multiple frameworks use the same principal, they
+// share the same counters and removing one framework doesn't remove
+// the counters.
+TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockScheduler sched1;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registered);
+ }
+
+ // 'sched2' uses the same principal "test-principal".
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registered);
+ }
+
+ // Message counters added after both frameworks are registered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver1->stop();
+ driver1->join();
+ delete driver1;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // Message counters are not removed after the first framework is
+ // unregistered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ driver2.stop();
+ driver2.join();
+
+ Shutdown();
+}
+
+
+// Verify that when a scheduler fails over, the new scheduler
+// instance continues to use the same counters.
+TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // 1. Launch the first (i.e., failing) scheduler and verify its
+ // counters.
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ {
+ // Grab this message so we can resend it.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+
+ // Grab this message to get the scheduler's pid.
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ driver1.start();
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+ AWAIT_READY(frameworkId);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Send a duplicate RegisterFrameworkMessage. Master replies
+ // with a duplicate FrameworkRegisteredMessage.
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // Now one message has been received and processed by Master in
+ // addition to the RegisterFrameworkMessage.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Settle to make sure message_processed counters are updated.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // Verify the message counters.
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ // One message received and processed after the framework is
+ // registered.
+ const string& messages_received =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // 2. Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler and verify that
+ // its counters are not reset to zero.
+
+ MockScheduler sched2;
+
+ FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+ framework2 = DEFAULT_FRAMEWORK_INFO;
+ framework2.mutable_id()->MergeFrom(frameworkId.get());
+
+ MesosSchedulerDriver driver2(
+ &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+ // Scheduler driver ignores duplicate FrameworkRegisteredMessages.
+ EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _))
+ .Times(1);
+
+ Future<Nothing> sched1Error;
+ EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+ .WillOnce(FutureSatisfy(&sched1Error));
+
+ {
+ // Grab this message to get the scheduler's pid and to make sure we
+ // wait until the framework is registered.
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ // Grab this message so we can resend it.
+ Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+ FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+ driver2.start();
+
+ AWAIT_READY(reregisterFrameworkMessage);
+ AWAIT_READY(sched1Error);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ // Sending a duplicate ReregisterFrameworkMessage to test the
+ // message counters with the new scheduler instance.
+ process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Settle to make sure message_processed counters are updated.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ // Another message after sched2 is reregistered plus the one from
+ // the sched1.
+ const string& messages_received =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+
+ EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver1.join());
+
+ Shutdown();
+}